diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index b3f148d..9d2fc3b 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.0" +version = "0.3.1-aa.1" authors = ["Dominik Werder "] edition = "2024" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 8fcd0cd..beca3fc 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -160,6 +160,8 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { } async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> { + todo!("scylla_schema_check config"); + let scy_confs = opts.scylla_insert_set_conf(0); let opstr = if do_change { "change" } else { "check" }; info!("start scylla schema {}", opstr); info!("{:?}", opts.scylla_config_st()); diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 50f7506..eb5411d 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -5,13 +5,14 @@ use async_channel::Sender; use channeltools::channel_combine_ab::ChannelCombineAB; use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; -use log::*; +use log; use netfetch::ca::connset::CaConnSet; use netfetch::ca::connset::CaConnSetCtrl; use netfetch::ca::connset::CaConnSetItem; use netfetch::conf::CaIngestOpts; use netfetch::conf::ChannelConfig; use netfetch::conf::ChannelsConfig; +use netfetch::conf::ScyllaInsertsetConf; use netfetch::daemon_common::ChannelName; use netfetch::daemon_common::DaemonEvent; use netfetch::metrics::RoutesResources; @@ -19,7 +20,6 @@ use netfetch::metrics::StatsSet; use netfetch::throttletrace::ThrottleTrace; use netpod::Database; use netpod::ttl::RetentionTime; -use scywr::config::ScyllaIngestConfig; use scywr::insertqueues::InsertQueuesRx; use scywr::insertqueues::InsertQueuesTx; use scywr::insertworker::InsertWorkerOpts; @@ -30,21 +30,20 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; use std::time::Duration; use std::time::Instant; -use std::time::SystemTime; use taskrun::tokio; use tokio::task::JoinHandle; const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); -const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500); +macro_rules! error { ($($arg:tt)*) => { if true { log::error!($($arg)*); } }; } +macro_rules! warn { ($($arg:tt)*) => { if true { log::warn!($($arg)*); } }; } +macro_rules! info { ($($arg:tt)*) => { if true { log::info!($($arg)*); } }; } +macro_rules! debug { ($($arg:tt)*) => { if true { log::debug!($($arg)*); } }; } + pub struct DaemonOpts { pgconf: Database, - scyconf_st_rf3: ScyllaIngestConfig, - scyconf_st_rf1: ScyllaIngestConfig, - scyconf_mt: ScyllaIngestConfig, - scyconf_lt: ScyllaIngestConfig, #[allow(unused)] test_bsread_addr: Option, insert_frac: Arc, @@ -56,14 +55,6 @@ pub struct Daemon { ingest_opts: CaIngestOpts, tx: Sender, rx: Receiver, - insert_queue_counter: Arc, - count_unknown_address: usize, - count_search_pending: usize, - count_search_sent: usize, - count_no_address: usize, - count_unassigned: usize, - count_assigned: usize, - last_status_print: SystemTime, insert_workers_jhs: Vec>>, shutting_down: bool, connset_ctrl: CaConnSetCtrl, @@ -83,41 +74,12 @@ pub struct Daemon { impl Daemon { pub async fn new(opts: DaemonOpts, ingest_opts: CaIngestOpts) -> Result { let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32); - // TODO keep join handles and await later let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers::(2, &opts.pgconf) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - - // TODO so far a dummy - let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16); - - let insert_queue_counter = Arc::new(AtomicUsize::new(0)); - let local_epics_hostname = ingest_linux::net::local_hostname(); - - #[cfg(target_abi = "x32")] - let query_item_rx = { - // TODO only testing, remove - tokio::spawn({ - let rx = query_item_rx; - async move { - while let Ok(item) = rx.recv().await { - drop(item); - } - } - }); - let (tx, rx) = async_channel::bounded(128); - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_millis(2000)).await; - tx.len(); - } - }); - rx - }; - let insert_worker_opts = InsertWorkerOpts { store_workers_rate: opts.store_workers_rate.clone(), insert_workers_running: Arc::new(AtomicU64::new(0)), @@ -126,6 +88,9 @@ impl Daemon { }; let insert_worker_opts = Arc::new(insert_worker_opts); + // TODO so far a dummy + let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16); + let (iqtx, iqrx) = { let (st_rf3_tx, st_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); @@ -182,14 +147,65 @@ impl Daemon { } }); - // let query_item_tx_weak = query_item_tx.downgrade(); // Insert queue hook // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); let ignore_writes = ingest_opts.scylla_ignore_writes(); let (insert_worker_output_tx, insert_worker_output_rx) = async_channel::bounded(256); - let mut insert_worker_jhs = Vec::new(); + + // TODO join these + let mut insert_workers_jhs = Vec::new(); + + async fn spawn_scylla_insert_workers( + scylla_set: &ScyllaInsertsetConf, + ingest_opts: &CaIngestOpts, + insert_worker_opts: &Arc, + iqrx: InsertQueuesRx, + ignore_writes: bool, + insert_worker_output_tx: Sender, + ) -> Result>>, Error> { + let rts = [ + RetentionTime::Short, + RetentionTime::Short, + RetentionTime::Medium, + RetentionTime::Long, + RetentionTime::Long, + ]; + let scyconfs = [ + scylla_set.st_rf1().clone(), + scylla_set.st_rf3().clone(), + scylla_set.mt_rf3().clone(), + scylla_set.lt_rf3().clone(), + scylla_set.lt_rf3().clone(), + ]; + let rxs = [ + iqrx.st_rf1_rx, + iqrx.st_rf3_rx, + iqrx.mt_rf3_rx, + iqrx.lt_rf3_rx, + iqrx.lt_rf3_lat5_rx, + ]; + let mut jhs = Vec::new(); + for (rt, (scyconf, rx)) in rts.into_iter().zip(scyconfs.into_iter().zip(rxs.into_iter())) { + let jh = scywr::insertworker::spawn_scylla_insert_workers( + rt, + scyconf, + ingest_opts.insert_scylla_sessions(), + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + rx, + insert_worker_opts.clone(), + ingest_opts.use_rate_limit_queue(), + ignore_writes, + insert_worker_output_tx.clone(), + ) + .await + .map_err(Error::from_string)?; + jhs.extend(jh); + } + Ok(jhs) + } if ingest_opts.scylla_disable() { let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( @@ -201,7 +217,7 @@ impl Daemon { ) .await .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); + insert_workers_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), @@ -211,7 +227,7 @@ impl Daemon { ) .await .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); + insert_workers_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), @@ -221,7 +237,7 @@ impl Daemon { ) .await .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); + insert_workers_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), @@ -231,7 +247,7 @@ impl Daemon { ) .await .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); + insert_workers_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), @@ -241,73 +257,44 @@ impl Daemon { ) .await .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); + insert_workers_jhs.extend(jh); } else { - let jh = scywr::insertworker::spawn_scylla_insert_workers( - RetentionTime::Short, - opts.scyconf_st_rf1.clone(), - ingest_opts.insert_scylla_sessions(), - ingest_opts.insert_worker_count(), - ingest_opts.insert_worker_concurrency(), - iqrx.st_rf1_rx, - insert_worker_opts.clone(), - ingest_opts.use_rate_limit_queue(), - ignore_writes, - insert_worker_output_tx.clone(), - ) - .await - .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); - - let jh = scywr::insertworker::spawn_scylla_insert_workers( - RetentionTime::Short, - opts.scyconf_st_rf3.clone(), - ingest_opts.insert_scylla_sessions(), - ingest_opts.insert_worker_count(), - ingest_opts.insert_worker_concurrency(), - iqrx.st_rf3_rx, - insert_worker_opts.clone(), - ingest_opts.use_rate_limit_queue(), - ignore_writes, - insert_worker_output_tx.clone(), - ) - .await - .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); - - let jh = scywr::insertworker::spawn_scylla_insert_workers( - RetentionTime::Medium, - opts.scyconf_mt.clone(), - ingest_opts.insert_scylla_sessions(), - ingest_opts.insert_worker_count(), - ingest_opts.insert_worker_concurrency(), - iqrx.mt_rf3_rx, - insert_worker_opts.clone(), - ingest_opts.use_rate_limit_queue(), - ignore_writes, - insert_worker_output_tx.clone(), - ) - .await - .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); - - let lt_rx_combined = ChannelCombineAB::new(iqrx.lt_rf3_rx, iqrx.lt_rf3_lat5_rx); - - let jh = scywr::insertworker::spawn_scylla_insert_workers( - RetentionTime::Long, - opts.scyconf_lt.clone(), - ingest_opts.insert_scylla_sessions(), - ingest_opts.insert_worker_count(), - ingest_opts.insert_worker_concurrency(), - lt_rx_combined, - insert_worker_opts.clone(), - ingest_opts.use_rate_limit_queue(), - ignore_writes, - insert_worker_output_tx.clone(), - ) - .await - .map_err(Error::from_string)?; - insert_worker_jhs.extend(jh); + let scyset1 = ingest_opts.scylla_insert_set_conf(0).unwrap(); + if let Some(scyset2) = ingest_opts.scylla_insert_set_conf(1) { + let (iqrx1, iqrx2) = iqrx.clone_2(); + let jhs = spawn_scylla_insert_workers( + &scyset1, + &ingest_opts, + &insert_worker_opts, + iqrx1, + ignore_writes, + insert_worker_output_tx.clone(), + ) + .await?; + insert_workers_jhs.extend(jhs); + let jhs = spawn_scylla_insert_workers( + &scyset2, + &ingest_opts, + &insert_worker_opts, + iqrx2, + ignore_writes, + insert_worker_output_tx, + ) + .await?; + insert_workers_jhs.extend(jhs); + } else { + let jhs = spawn_scylla_insert_workers( + &scyset1, + &ingest_opts, + &insert_worker_opts, + iqrx, + ignore_writes, + insert_worker_output_tx.clone(), + ) + .await?; + insert_workers_jhs.extend(jhs); + } + // let lt_rx_combined = ChannelCombineAB::new(iqrx.lt_rf3_rx, iqrx.lt_rf3_lat5_rx); }; #[cfg(feature = "bsread")] @@ -345,26 +332,10 @@ impl Daemon { { // TODO join the task - let tx = daemon_ev_tx.clone(); - tokio::task::spawn(async move { - loop { - match insert_worker_output_rx.recv().await { - Ok(x) => { - match tx.send(DaemonEvent::ScyllaInsertWorkerOutput(x)).await { - Ok(()) => {} - Err(_) => { - // TODO - break; - } - } - } - Err(_) => { - // TODO - break; - } - } - } - }); + let jh = tokio::task::spawn(Self::insert_worker_out_merge( + insert_worker_output_rx, + daemon_ev_tx.clone(), + )); } let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8); @@ -373,15 +344,7 @@ impl Daemon { ingest_opts, tx: daemon_ev_tx, rx: daemon_ev_rx, - insert_queue_counter, - count_unknown_address: 0, - count_search_pending: 0, - count_search_sent: 0, - count_no_address: 0, - count_unassigned: 0, - count_assigned: 0, - last_status_print: SystemTime::now(), - insert_workers_jhs: insert_worker_jhs, + insert_workers_jhs, shutting_down: false, connset_ctrl: conn_set_ctrl, connset_status_last: Instant::now(), @@ -397,6 +360,29 @@ impl Daemon { Ok(ret) } + async fn insert_worker_out_merge( + rx: Receiver, + tx: Sender, + ) { + loop { + match rx.recv().await { + Ok(x) => { + match tx.send(DaemonEvent::ScyllaInsertWorkerOutput(x)).await { + Ok(()) => {} + Err(_) => { + // TODO + break; + } + } + } + Err(_) => { + // TODO + break; + } + } + } + } + async fn check_health(&mut self, ts1: Instant) -> Result<(), Error> { self.check_health_connset(ts1)?; Ok(()) @@ -484,20 +470,8 @@ impl Daemon { async fn handle_timer_tick(&mut self) -> Result<(), Error> { if self.shutting_down { let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire); - #[cfg(feature = "DISABLED")] - { - let nitems = self - .query_item_tx_weak - .upgrade() - .map(|x| (x.sender_count(), x.receiver_count(), x.len())); - info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems); - } - if nworkers == 0 { - info!("goodbye"); - std::process::exit(0); - } + log::info!("shutting_down insert_workers_running {nworkers}"); } - let tsnow = SystemTime::now(); { let n = SIGINT.load(atomic::Ordering::Acquire); let m = SIGINT_CONFIRM.load(atomic::Ordering::Acquire); @@ -510,29 +484,18 @@ impl Daemon { warn!("Received SIGTERM"); SIGTERM.store(2, atomic::Ordering::Release); } - let ts1 = Instant::now(); - self.check_health(ts1).await?; - let dt = ts1.elapsed(); - if dt > CHECK_CHANNEL_SLOW_WARN { - info!("slow check_chans {:.0} ms", dt.as_secs_f32() * 1e3); - } - if false && tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL { - self.last_status_print = tsnow; - info!( - "{:8} {:8} {:8} : {:8} : {:8} {:8} : {:10}", - self.count_unknown_address, - self.count_search_pending, - self.count_search_sent, - self.count_no_address, - self.count_unassigned, - self.count_assigned, - self.insert_queue_counter.load(atomic::Ordering::Acquire) - ); + { + let ts1 = Instant::now(); + self.check_health(ts1).await?; + let dt = ts1.elapsed(); + if dt > CHECK_CHANNEL_SLOW_WARN { + info!("slow check_chans {:.0} ms", dt.as_secs_f32() * 1e3); + } } let iqtxm = self .iqtx .as_ref() - .map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x)); + .map(|x| scywr::insertqueues::InsertQueuesTxMetrics::from(x)); if let Some(iqtxm) = iqtxm { self.daemon_metrics.iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _); self.daemon_metrics.iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _); @@ -974,10 +937,6 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> let opts2 = DaemonOpts { pgconf: opts.postgresql_config().clone(), - scyconf_st_rf3: opts.scylla_config_st().clone(), - scyconf_st_rf1: opts.scylla_config_st_rf1().clone(), - scyconf_mt: opts.scylla_config_mt().clone(), - scyconf_lt: opts.scylla_config_lt().clone(), test_bsread_addr: opts.test_bsread_addr.clone(), insert_frac: insert_frac.clone(), store_workers_rate, diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 6c8d805..2dcd285 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -70,6 +70,20 @@ impl CaIngestOpts { &self.postgresql } + pub fn scylla_insert_set_conf(&self, n: usize) -> Option { + if n == 0 { + let ret = ScyllaInsertsetConf { + st_rf1: self.scylla_config_st_rf1(), + st_rf3: self.scylla_config_st(), + mt_rf3: self.scylla_config_mt(), + lt_rf3: self.scylla_config_lt(), + }; + Some(ret) + } else { + None + } + } + pub fn scylla_config_st(&self) -> ScyllaIngestConfig { let d = &self.scylla; let c = &self.scylla_st; @@ -139,7 +153,7 @@ impl CaIngestOpts { } pub fn insert_item_queue_cap(&self) -> usize { - self.insert_item_queue_cap.unwrap_or(1000 * 1000) * 2 + self.insert_item_queue_cap.unwrap_or(1000 * 100) } pub fn store_workers_rate(&self) -> u64 { @@ -276,6 +290,61 @@ scylla_lt: assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); } +#[test] +fn parse_config_with_scylla_double_ingest() { + let conf = r###" +backend: test_backend +timeout: 10m 3s 45ms +api_bind: "0.0.0.0:3011" +channels: /some/path/file.txt +search: +- 172.26.0.255 +- 172.26.2.255 +postgresql: + host: host.example.com + port: 5432 + user: USER + pass: PASS + name: NAME +scylla: + hosts: + - node1:19042 + - node2:19042 +scylla_st: + keyspace: ks_st +scylla_mt: + keyspace: ks_mt +scylla_st_rf1: + keyspace: ks_st_rf1 +scylla_lt: + keyspace: ks_lt + hosts: + - node3:19042 + - node4:19042 +scylla_2nd: + scylla: + hosts: + - node1:19042 + scylla_st: + keyspace: ks_2nd_st + scylla_mt: + keyspace: ks_2nd_mt + scylla_lt: + keyspace: ks_2nd_lt + scylla_st_rf1: + keyspace: ks_2nd_st_rf1 +"###; + let res: Result = serde_yaml::from_slice(conf.as_bytes()); + let conf = res.unwrap(); + assert_eq!(conf.is_valid(), true); + assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt"))); + assert_eq!(&conf.api_bind, "0.0.0.0:3011"); + assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); + assert_eq!(conf.scylla_config_st().hosts().get(1), Some(&"node2:19042".to_string())); + assert_eq!(conf.scylla_config_lt().hosts().get(1), Some(&"node4:19042".to_string())); + assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); +} + #[test] fn test_duration_parse() { #[derive(Serialize, Deserialize)] @@ -890,3 +959,28 @@ impl From for ChannelConfigForStatesApi { Self { arch: value.arch } } } + +pub struct ScyllaInsertsetConf { + st_rf1: ScyllaIngestConfig, + st_rf3: ScyllaIngestConfig, + mt_rf3: ScyllaIngestConfig, + lt_rf3: ScyllaIngestConfig, +} + +impl ScyllaInsertsetConf { + pub fn st_rf1(&self) -> &ScyllaIngestConfig { + &self.st_rf1 + } + + pub fn st_rf3(&self) -> &ScyllaIngestConfig { + &self.st_rf3 + } + + pub fn mt_rf3(&self) -> &ScyllaIngestConfig { + &self.mt_rf3 + } + + pub fn lt_rf3(&self) -> &ScyllaIngestConfig { + &self.lt_rf3 + } +} diff --git a/netfetch/src/metrics/types.rs b/netfetch/src/metrics/types.rs index 119c7d6..5dcc16d 100644 --- a/netfetch/src/metrics/types.rs +++ b/netfetch/src/metrics/types.rs @@ -1,26 +1,6 @@ use scywr::insertqueues::InsertQueuesTx; use serde::Serialize; -pub struct InsertQueuesTxMetrics { - pub st_rf1_len: usize, - pub st_rf3_len: usize, - pub mt_rf3_len: usize, - pub lt_rf3_len: usize, - pub lt_rf3_lat5_len: usize, -} - -impl From<&InsertQueuesTx> for InsertQueuesTxMetrics { - fn from(value: &InsertQueuesTx) -> Self { - Self { - st_rf1_len: value.st_rf1_tx.len(), - st_rf3_len: value.st_rf3_tx.len(), - mt_rf3_len: value.mt_rf3_tx.len(), - lt_rf3_len: value.lt_rf3_tx.len(), - lt_rf3_lat5_len: value.lt_rf3_lat5_tx.len(), - } - } -} - #[derive(Debug, Serialize)] pub struct MetricsPrometheusShort { counters: Vec, diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 7e20625..d9a3547 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -4,10 +4,10 @@ use crate::iteminsertqueue::QueryItem; use crate::senderpolling::SenderPolling; use async_channel::Receiver; use async_channel::Sender; -use core::fmt; use netpod::ttl::RetentionTime; use pin_project::pin_project; use std::collections::VecDeque; +use std::fmt; use std::pin::Pin; autoerr::create_error_v1!( @@ -151,6 +151,65 @@ pub struct InsertQueuesRx { pub lt_rf3_lat5_rx: Receiver>, } +impl InsertQueuesRx { + pub fn clone_2(self) -> (Self, Self) { + async fn feed( + rx: Receiver>, + tx1: Sender>, + tx2: Sender>, + ) { + loop { + match rx.recv().await { + Ok(item1) => { + let item2 = item1.clone(); + match tx1.send(item1).await { + Ok(()) => {} + Err(_) => { + break; + } + } + match tx2.send(item2).await { + Ok(()) => {} + Err(_) => { + break; + } + } + } + Err(_) => { + break; + } + } + } + } + fn dupl(rx: Receiver>) -> (Receiver>, Receiver>) { + let (tx1, rx1) = async_channel::bounded(128); + let (tx2, rx2) = async_channel::bounded(128); + taskrun::tokio::spawn(feed(rx, tx1, tx2)); + (rx1, rx2) + } + let st_rf1_rx = dupl(self.st_rf1_rx); + let st_rf3_rx = dupl(self.st_rf3_rx); + let mt_rf3_rx = dupl(self.mt_rf3_rx); + let lt_rf3_rx = dupl(self.lt_rf3_rx); + let lt_rf3_lat5_rx = dupl(self.lt_rf3_lat5_rx); + let ret1 = InsertQueuesRx { + st_rf1_rx: st_rf1_rx.0, + st_rf3_rx: st_rf3_rx.0, + mt_rf3_rx: mt_rf3_rx.0, + lt_rf3_rx: lt_rf3_rx.0, + lt_rf3_lat5_rx: lt_rf3_lat5_rx.0, + }; + let ret2 = InsertQueuesRx { + st_rf1_rx: st_rf1_rx.1, + st_rf3_rx: st_rf3_rx.1, + mt_rf3_rx: mt_rf3_rx.1, + lt_rf3_rx: lt_rf3_rx.1, + lt_rf3_lat5_rx: lt_rf3_lat5_rx.1, + }; + (ret1, ret2) + } +} + pub struct InsertDeques { pub st_rf1_qu: VecDeque, pub st_rf3_qu: VecDeque, @@ -330,3 +389,23 @@ impl<'a> fmt::Display for InsertSenderPollingSummary<'a> { ) } } + +pub struct InsertQueuesTxMetrics { + pub st_rf1_len: usize, + pub st_rf3_len: usize, + pub mt_rf3_len: usize, + pub lt_rf3_len: usize, + pub lt_rf3_lat5_len: usize, +} + +impl From<&InsertQueuesTx> for InsertQueuesTxMetrics { + fn from(value: &InsertQueuesTx) -> Self { + Self { + st_rf1_len: value.st_rf1_tx.len(), + st_rf3_len: value.st_rf3_tx.len(), + mt_rf3_len: value.mt_rf3_tx.len(), + lt_rf3_len: value.lt_rf3_tx.len(), + lt_rf3_lat5_len: value.lt_rf3_lat5_tx.len(), + } + } +}