WIP config 2nd scylla outlet
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqingest"
|
||||
version = "0.3.0"
|
||||
version = "0.3.1-aa.1"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2024"
|
||||
|
||||
|
||||
@@ -160,6 +160,8 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
|
||||
}
|
||||
|
||||
async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> {
|
||||
todo!("scylla_schema_check config");
|
||||
let scy_confs = opts.scylla_insert_set_conf(0);
|
||||
let opstr = if do_change { "change" } else { "check" };
|
||||
info!("start scylla schema {}", opstr);
|
||||
info!("{:?}", opts.scylla_config_st());
|
||||
|
||||
@@ -5,13 +5,14 @@ use async_channel::Sender;
|
||||
use channeltools::channel_combine_ab::ChannelCombineAB;
|
||||
use dbpg::seriesbychannel::ChannelInfoQuery;
|
||||
use err::Error;
|
||||
use log::*;
|
||||
use log;
|
||||
use netfetch::ca::connset::CaConnSet;
|
||||
use netfetch::ca::connset::CaConnSetCtrl;
|
||||
use netfetch::ca::connset::CaConnSetItem;
|
||||
use netfetch::conf::CaIngestOpts;
|
||||
use netfetch::conf::ChannelConfig;
|
||||
use netfetch::conf::ChannelsConfig;
|
||||
use netfetch::conf::ScyllaInsertsetConf;
|
||||
use netfetch::daemon_common::ChannelName;
|
||||
use netfetch::daemon_common::DaemonEvent;
|
||||
use netfetch::metrics::RoutesResources;
|
||||
@@ -19,7 +20,6 @@ use netfetch::metrics::StatsSet;
|
||||
use netfetch::throttletrace::ThrottleTrace;
|
||||
use netpod::Database;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use scywr::config::ScyllaIngestConfig;
|
||||
use scywr::insertqueues::InsertQueuesRx;
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use scywr::insertworker::InsertWorkerOpts;
|
||||
@@ -30,21 +30,20 @@ use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use taskrun::tokio;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
|
||||
const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500);
|
||||
|
||||
macro_rules! error { ($($arg:tt)*) => { if true { log::error!($($arg)*); } }; }
|
||||
macro_rules! warn { ($($arg:tt)*) => { if true { log::warn!($($arg)*); } }; }
|
||||
macro_rules! info { ($($arg:tt)*) => { if true { log::info!($($arg)*); } }; }
|
||||
macro_rules! debug { ($($arg:tt)*) => { if true { log::debug!($($arg)*); } }; }
|
||||
|
||||
pub struct DaemonOpts {
|
||||
pgconf: Database,
|
||||
scyconf_st_rf3: ScyllaIngestConfig,
|
||||
scyconf_st_rf1: ScyllaIngestConfig,
|
||||
scyconf_mt: ScyllaIngestConfig,
|
||||
scyconf_lt: ScyllaIngestConfig,
|
||||
#[allow(unused)]
|
||||
test_bsread_addr: Option<String>,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
@@ -56,14 +55,6 @@ pub struct Daemon {
|
||||
ingest_opts: CaIngestOpts,
|
||||
tx: Sender<DaemonEvent>,
|
||||
rx: Receiver<DaemonEvent>,
|
||||
insert_queue_counter: Arc<AtomicUsize>,
|
||||
count_unknown_address: usize,
|
||||
count_search_pending: usize,
|
||||
count_search_sent: usize,
|
||||
count_no_address: usize,
|
||||
count_unassigned: usize,
|
||||
count_assigned: usize,
|
||||
last_status_print: SystemTime,
|
||||
insert_workers_jhs: Vec<JoinHandle<Result<(), scywr::insertworker::Error>>>,
|
||||
shutting_down: bool,
|
||||
connset_ctrl: CaConnSetCtrl,
|
||||
@@ -83,41 +74,12 @@ pub struct Daemon {
|
||||
impl Daemon {
|
||||
pub async fn new(opts: DaemonOpts, ingest_opts: CaIngestOpts) -> Result<Self, Error> {
|
||||
let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32);
|
||||
|
||||
// TODO keep join handles and await later
|
||||
let (channel_info_query_tx, jhs, jh) =
|
||||
dbpg::seriesbychannel::start_lookup_workers::<dbpg::seriesbychannel::SalterRandom>(2, &opts.pgconf)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
|
||||
// TODO so far a dummy
|
||||
let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16);
|
||||
|
||||
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let local_epics_hostname = ingest_linux::net::local_hostname();
|
||||
|
||||
#[cfg(target_abi = "x32")]
|
||||
let query_item_rx = {
|
||||
// TODO only testing, remove
|
||||
tokio::spawn({
|
||||
let rx = query_item_rx;
|
||||
async move {
|
||||
while let Ok(item) = rx.recv().await {
|
||||
drop(item);
|
||||
}
|
||||
}
|
||||
});
|
||||
let (tx, rx) = async_channel::bounded(128);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(2000)).await;
|
||||
tx.len();
|
||||
}
|
||||
});
|
||||
rx
|
||||
};
|
||||
|
||||
let insert_worker_opts = InsertWorkerOpts {
|
||||
store_workers_rate: opts.store_workers_rate.clone(),
|
||||
insert_workers_running: Arc::new(AtomicU64::new(0)),
|
||||
@@ -126,6 +88,9 @@ impl Daemon {
|
||||
};
|
||||
let insert_worker_opts = Arc::new(insert_worker_opts);
|
||||
|
||||
// TODO so far a dummy
|
||||
let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16);
|
||||
|
||||
let (iqtx, iqrx) = {
|
||||
let (st_rf3_tx, st_rf3_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
|
||||
let (st_rf1_tx, st_rf1_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap());
|
||||
@@ -182,14 +147,65 @@ impl Daemon {
|
||||
}
|
||||
});
|
||||
|
||||
// let query_item_tx_weak = query_item_tx.downgrade();
|
||||
// Insert queue hook
|
||||
// let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
|
||||
|
||||
let ignore_writes = ingest_opts.scylla_ignore_writes();
|
||||
|
||||
let (insert_worker_output_tx, insert_worker_output_rx) = async_channel::bounded(256);
|
||||
let mut insert_worker_jhs = Vec::new();
|
||||
|
||||
// TODO join these
|
||||
let mut insert_workers_jhs = Vec::new();
|
||||
|
||||
async fn spawn_scylla_insert_workers(
|
||||
scylla_set: &ScyllaInsertsetConf,
|
||||
ingest_opts: &CaIngestOpts,
|
||||
insert_worker_opts: &Arc<InsertWorkerOpts>,
|
||||
iqrx: InsertQueuesRx,
|
||||
ignore_writes: bool,
|
||||
insert_worker_output_tx: Sender<scywr::insertworker::InsertWorkerOutputItem>,
|
||||
) -> Result<Vec<JoinHandle<Result<(), scywr::insertworker::Error>>>, Error> {
|
||||
let rts = [
|
||||
RetentionTime::Short,
|
||||
RetentionTime::Short,
|
||||
RetentionTime::Medium,
|
||||
RetentionTime::Long,
|
||||
RetentionTime::Long,
|
||||
];
|
||||
let scyconfs = [
|
||||
scylla_set.st_rf1().clone(),
|
||||
scylla_set.st_rf3().clone(),
|
||||
scylla_set.mt_rf3().clone(),
|
||||
scylla_set.lt_rf3().clone(),
|
||||
scylla_set.lt_rf3().clone(),
|
||||
];
|
||||
let rxs = [
|
||||
iqrx.st_rf1_rx,
|
||||
iqrx.st_rf3_rx,
|
||||
iqrx.mt_rf3_rx,
|
||||
iqrx.lt_rf3_rx,
|
||||
iqrx.lt_rf3_lat5_rx,
|
||||
];
|
||||
let mut jhs = Vec::new();
|
||||
for (rt, (scyconf, rx)) in rts.into_iter().zip(scyconfs.into_iter().zip(rxs.into_iter())) {
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers(
|
||||
rt,
|
||||
scyconf,
|
||||
ingest_opts.insert_scylla_sessions(),
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
rx,
|
||||
insert_worker_opts.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
jhs.extend(jh);
|
||||
}
|
||||
Ok(jhs)
|
||||
}
|
||||
|
||||
if ingest_opts.scylla_disable() {
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
|
||||
@@ -201,7 +217,7 @@ impl Daemon {
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
insert_workers_jhs.extend(jh);
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
@@ -211,7 +227,7 @@ impl Daemon {
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
insert_workers_jhs.extend(jh);
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
@@ -221,7 +237,7 @@ impl Daemon {
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
insert_workers_jhs.extend(jh);
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
@@ -231,7 +247,7 @@ impl Daemon {
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
insert_workers_jhs.extend(jh);
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
@@ -241,73 +257,44 @@ impl Daemon {
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
insert_workers_jhs.extend(jh);
|
||||
} else {
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers(
|
||||
RetentionTime::Short,
|
||||
opts.scyconf_st_rf1.clone(),
|
||||
ingest_opts.insert_scylla_sessions(),
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
iqrx.st_rf1_rx,
|
||||
insert_worker_opts.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers(
|
||||
RetentionTime::Short,
|
||||
opts.scyconf_st_rf3.clone(),
|
||||
ingest_opts.insert_scylla_sessions(),
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
iqrx.st_rf3_rx,
|
||||
insert_worker_opts.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers(
|
||||
RetentionTime::Medium,
|
||||
opts.scyconf_mt.clone(),
|
||||
ingest_opts.insert_scylla_sessions(),
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
iqrx.mt_rf3_rx,
|
||||
insert_worker_opts.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
|
||||
let lt_rx_combined = ChannelCombineAB::new(iqrx.lt_rf3_rx, iqrx.lt_rf3_lat5_rx);
|
||||
|
||||
let jh = scywr::insertworker::spawn_scylla_insert_workers(
|
||||
RetentionTime::Long,
|
||||
opts.scyconf_lt.clone(),
|
||||
ingest_opts.insert_scylla_sessions(),
|
||||
ingest_opts.insert_worker_count(),
|
||||
ingest_opts.insert_worker_concurrency(),
|
||||
lt_rx_combined,
|
||||
insert_worker_opts.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
insert_worker_jhs.extend(jh);
|
||||
let scyset1 = ingest_opts.scylla_insert_set_conf(0).unwrap();
|
||||
if let Some(scyset2) = ingest_opts.scylla_insert_set_conf(1) {
|
||||
let (iqrx1, iqrx2) = iqrx.clone_2();
|
||||
let jhs = spawn_scylla_insert_workers(
|
||||
&scyset1,
|
||||
&ingest_opts,
|
||||
&insert_worker_opts,
|
||||
iqrx1,
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await?;
|
||||
insert_workers_jhs.extend(jhs);
|
||||
let jhs = spawn_scylla_insert_workers(
|
||||
&scyset2,
|
||||
&ingest_opts,
|
||||
&insert_worker_opts,
|
||||
iqrx2,
|
||||
ignore_writes,
|
||||
insert_worker_output_tx,
|
||||
)
|
||||
.await?;
|
||||
insert_workers_jhs.extend(jhs);
|
||||
} else {
|
||||
let jhs = spawn_scylla_insert_workers(
|
||||
&scyset1,
|
||||
&ingest_opts,
|
||||
&insert_worker_opts,
|
||||
iqrx,
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await?;
|
||||
insert_workers_jhs.extend(jhs);
|
||||
}
|
||||
// let lt_rx_combined = ChannelCombineAB::new(iqrx.lt_rf3_rx, iqrx.lt_rf3_lat5_rx);
|
||||
};
|
||||
|
||||
#[cfg(feature = "bsread")]
|
||||
@@ -345,26 +332,10 @@ impl Daemon {
|
||||
|
||||
{
|
||||
// TODO join the task
|
||||
let tx = daemon_ev_tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
match insert_worker_output_rx.recv().await {
|
||||
Ok(x) => {
|
||||
match tx.send(DaemonEvent::ScyllaInsertWorkerOutput(x)).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
// TODO
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// TODO
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let jh = tokio::task::spawn(Self::insert_worker_out_merge(
|
||||
insert_worker_output_rx,
|
||||
daemon_ev_tx.clone(),
|
||||
));
|
||||
}
|
||||
let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8);
|
||||
|
||||
@@ -373,15 +344,7 @@ impl Daemon {
|
||||
ingest_opts,
|
||||
tx: daemon_ev_tx,
|
||||
rx: daemon_ev_rx,
|
||||
insert_queue_counter,
|
||||
count_unknown_address: 0,
|
||||
count_search_pending: 0,
|
||||
count_search_sent: 0,
|
||||
count_no_address: 0,
|
||||
count_unassigned: 0,
|
||||
count_assigned: 0,
|
||||
last_status_print: SystemTime::now(),
|
||||
insert_workers_jhs: insert_worker_jhs,
|
||||
insert_workers_jhs,
|
||||
shutting_down: false,
|
||||
connset_ctrl: conn_set_ctrl,
|
||||
connset_status_last: Instant::now(),
|
||||
@@ -397,6 +360,29 @@ impl Daemon {
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn insert_worker_out_merge(
|
||||
rx: Receiver<scywr::insertworker::InsertWorkerOutputItem>,
|
||||
tx: Sender<DaemonEvent>,
|
||||
) {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(x) => {
|
||||
match tx.send(DaemonEvent::ScyllaInsertWorkerOutput(x)).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
// TODO
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// TODO
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_health(&mut self, ts1: Instant) -> Result<(), Error> {
|
||||
self.check_health_connset(ts1)?;
|
||||
Ok(())
|
||||
@@ -484,20 +470,8 @@ impl Daemon {
|
||||
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
|
||||
if self.shutting_down {
|
||||
let nworkers = self.insert_workers_running.load(atomic::Ordering::Acquire);
|
||||
#[cfg(feature = "DISABLED")]
|
||||
{
|
||||
let nitems = self
|
||||
.query_item_tx_weak
|
||||
.upgrade()
|
||||
.map(|x| (x.sender_count(), x.receiver_count(), x.len()));
|
||||
info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems);
|
||||
}
|
||||
if nworkers == 0 {
|
||||
info!("goodbye");
|
||||
std::process::exit(0);
|
||||
}
|
||||
log::info!("shutting_down insert_workers_running {nworkers}");
|
||||
}
|
||||
let tsnow = SystemTime::now();
|
||||
{
|
||||
let n = SIGINT.load(atomic::Ordering::Acquire);
|
||||
let m = SIGINT_CONFIRM.load(atomic::Ordering::Acquire);
|
||||
@@ -510,29 +484,18 @@ impl Daemon {
|
||||
warn!("Received SIGTERM");
|
||||
SIGTERM.store(2, atomic::Ordering::Release);
|
||||
}
|
||||
let ts1 = Instant::now();
|
||||
self.check_health(ts1).await?;
|
||||
let dt = ts1.elapsed();
|
||||
if dt > CHECK_CHANNEL_SLOW_WARN {
|
||||
info!("slow check_chans {:.0} ms", dt.as_secs_f32() * 1e3);
|
||||
}
|
||||
if false && tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL {
|
||||
self.last_status_print = tsnow;
|
||||
info!(
|
||||
"{:8} {:8} {:8} : {:8} : {:8} {:8} : {:10}",
|
||||
self.count_unknown_address,
|
||||
self.count_search_pending,
|
||||
self.count_search_sent,
|
||||
self.count_no_address,
|
||||
self.count_unassigned,
|
||||
self.count_assigned,
|
||||
self.insert_queue_counter.load(atomic::Ordering::Acquire)
|
||||
);
|
||||
{
|
||||
let ts1 = Instant::now();
|
||||
self.check_health(ts1).await?;
|
||||
let dt = ts1.elapsed();
|
||||
if dt > CHECK_CHANNEL_SLOW_WARN {
|
||||
info!("slow check_chans {:.0} ms", dt.as_secs_f32() * 1e3);
|
||||
}
|
||||
}
|
||||
let iqtxm = self
|
||||
.iqtx
|
||||
.as_ref()
|
||||
.map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x));
|
||||
.map(|x| scywr::insertqueues::InsertQueuesTxMetrics::from(x));
|
||||
if let Some(iqtxm) = iqtxm {
|
||||
self.daemon_metrics.iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _);
|
||||
self.daemon_metrics.iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _);
|
||||
@@ -974,10 +937,6 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
|
||||
|
||||
let opts2 = DaemonOpts {
|
||||
pgconf: opts.postgresql_config().clone(),
|
||||
scyconf_st_rf3: opts.scylla_config_st().clone(),
|
||||
scyconf_st_rf1: opts.scylla_config_st_rf1().clone(),
|
||||
scyconf_mt: opts.scylla_config_mt().clone(),
|
||||
scyconf_lt: opts.scylla_config_lt().clone(),
|
||||
test_bsread_addr: opts.test_bsread_addr.clone(),
|
||||
insert_frac: insert_frac.clone(),
|
||||
store_workers_rate,
|
||||
|
||||
@@ -70,6 +70,20 @@ impl CaIngestOpts {
|
||||
&self.postgresql
|
||||
}
|
||||
|
||||
pub fn scylla_insert_set_conf(&self, n: usize) -> Option<ScyllaInsertsetConf> {
|
||||
if n == 0 {
|
||||
let ret = ScyllaInsertsetConf {
|
||||
st_rf1: self.scylla_config_st_rf1(),
|
||||
st_rf3: self.scylla_config_st(),
|
||||
mt_rf3: self.scylla_config_mt(),
|
||||
lt_rf3: self.scylla_config_lt(),
|
||||
};
|
||||
Some(ret)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn scylla_config_st(&self) -> ScyllaIngestConfig {
|
||||
let d = &self.scylla;
|
||||
let c = &self.scylla_st;
|
||||
@@ -139,7 +153,7 @@ impl CaIngestOpts {
|
||||
}
|
||||
|
||||
pub fn insert_item_queue_cap(&self) -> usize {
|
||||
self.insert_item_queue_cap.unwrap_or(1000 * 1000) * 2
|
||||
self.insert_item_queue_cap.unwrap_or(1000 * 100)
|
||||
}
|
||||
|
||||
pub fn store_workers_rate(&self) -> u64 {
|
||||
@@ -276,6 +290,61 @@ scylla_lt:
|
||||
assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_config_with_scylla_double_ingest() {
|
||||
let conf = r###"
|
||||
backend: test_backend
|
||||
timeout: 10m 3s 45ms
|
||||
api_bind: "0.0.0.0:3011"
|
||||
channels: /some/path/file.txt
|
||||
search:
|
||||
- 172.26.0.255
|
||||
- 172.26.2.255
|
||||
postgresql:
|
||||
host: host.example.com
|
||||
port: 5432
|
||||
user: USER
|
||||
pass: PASS
|
||||
name: NAME
|
||||
scylla:
|
||||
hosts:
|
||||
- node1:19042
|
||||
- node2:19042
|
||||
scylla_st:
|
||||
keyspace: ks_st
|
||||
scylla_mt:
|
||||
keyspace: ks_mt
|
||||
scylla_st_rf1:
|
||||
keyspace: ks_st_rf1
|
||||
scylla_lt:
|
||||
keyspace: ks_lt
|
||||
hosts:
|
||||
- node3:19042
|
||||
- node4:19042
|
||||
scylla_2nd:
|
||||
scylla:
|
||||
hosts:
|
||||
- node1:19042
|
||||
scylla_st:
|
||||
keyspace: ks_2nd_st
|
||||
scylla_mt:
|
||||
keyspace: ks_2nd_mt
|
||||
scylla_lt:
|
||||
keyspace: ks_2nd_lt
|
||||
scylla_st_rf1:
|
||||
keyspace: ks_2nd_st_rf1
|
||||
"###;
|
||||
let res: Result<CaIngestOpts, _> = serde_yaml::from_slice(conf.as_bytes());
|
||||
let conf = res.unwrap();
|
||||
assert_eq!(conf.is_valid(), true);
|
||||
assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt")));
|
||||
assert_eq!(&conf.api_bind, "0.0.0.0:3011");
|
||||
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
|
||||
assert_eq!(conf.scylla_config_st().hosts().get(1), Some(&"node2:19042".to_string()));
|
||||
assert_eq!(conf.scylla_config_lt().hosts().get(1), Some(&"node4:19042".to_string()));
|
||||
assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_duration_parse() {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -890,3 +959,28 @@ impl From<ChannelConfig> for ChannelConfigForStatesApi {
|
||||
Self { arch: value.arch }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ScyllaInsertsetConf {
|
||||
st_rf1: ScyllaIngestConfig,
|
||||
st_rf3: ScyllaIngestConfig,
|
||||
mt_rf3: ScyllaIngestConfig,
|
||||
lt_rf3: ScyllaIngestConfig,
|
||||
}
|
||||
|
||||
impl ScyllaInsertsetConf {
|
||||
pub fn st_rf1(&self) -> &ScyllaIngestConfig {
|
||||
&self.st_rf1
|
||||
}
|
||||
|
||||
pub fn st_rf3(&self) -> &ScyllaIngestConfig {
|
||||
&self.st_rf3
|
||||
}
|
||||
|
||||
pub fn mt_rf3(&self) -> &ScyllaIngestConfig {
|
||||
&self.mt_rf3
|
||||
}
|
||||
|
||||
pub fn lt_rf3(&self) -> &ScyllaIngestConfig {
|
||||
&self.lt_rf3
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,26 +1,6 @@
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use serde::Serialize;
|
||||
|
||||
pub struct InsertQueuesTxMetrics {
|
||||
pub st_rf1_len: usize,
|
||||
pub st_rf3_len: usize,
|
||||
pub mt_rf3_len: usize,
|
||||
pub lt_rf3_len: usize,
|
||||
pub lt_rf3_lat5_len: usize,
|
||||
}
|
||||
|
||||
impl From<&InsertQueuesTx> for InsertQueuesTxMetrics {
|
||||
fn from(value: &InsertQueuesTx) -> Self {
|
||||
Self {
|
||||
st_rf1_len: value.st_rf1_tx.len(),
|
||||
st_rf3_len: value.st_rf3_tx.len(),
|
||||
mt_rf3_len: value.mt_rf3_tx.len(),
|
||||
lt_rf3_len: value.lt_rf3_tx.len(),
|
||||
lt_rf3_lat5_len: value.lt_rf3_lat5_tx.len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct MetricsPrometheusShort {
|
||||
counters: Vec<String>,
|
||||
|
||||
@@ -4,10 +4,10 @@ use crate::iteminsertqueue::QueryItem;
|
||||
use crate::senderpolling::SenderPolling;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use core::fmt;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use pin_project::pin_project;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
@@ -151,6 +151,65 @@ pub struct InsertQueuesRx {
|
||||
pub lt_rf3_lat5_rx: Receiver<VecDeque<QueryItem>>,
|
||||
}
|
||||
|
||||
impl InsertQueuesRx {
|
||||
pub fn clone_2(self) -> (Self, Self) {
|
||||
async fn feed(
|
||||
rx: Receiver<VecDeque<QueryItem>>,
|
||||
tx1: Sender<VecDeque<QueryItem>>,
|
||||
tx2: Sender<VecDeque<QueryItem>>,
|
||||
) {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(item1) => {
|
||||
let item2 = item1.clone();
|
||||
match tx1.send(item1).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
match tx2.send(item2).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fn dupl(rx: Receiver<VecDeque<QueryItem>>) -> (Receiver<VecDeque<QueryItem>>, Receiver<VecDeque<QueryItem>>) {
|
||||
let (tx1, rx1) = async_channel::bounded(128);
|
||||
let (tx2, rx2) = async_channel::bounded(128);
|
||||
taskrun::tokio::spawn(feed(rx, tx1, tx2));
|
||||
(rx1, rx2)
|
||||
}
|
||||
let st_rf1_rx = dupl(self.st_rf1_rx);
|
||||
let st_rf3_rx = dupl(self.st_rf3_rx);
|
||||
let mt_rf3_rx = dupl(self.mt_rf3_rx);
|
||||
let lt_rf3_rx = dupl(self.lt_rf3_rx);
|
||||
let lt_rf3_lat5_rx = dupl(self.lt_rf3_lat5_rx);
|
||||
let ret1 = InsertQueuesRx {
|
||||
st_rf1_rx: st_rf1_rx.0,
|
||||
st_rf3_rx: st_rf3_rx.0,
|
||||
mt_rf3_rx: mt_rf3_rx.0,
|
||||
lt_rf3_rx: lt_rf3_rx.0,
|
||||
lt_rf3_lat5_rx: lt_rf3_lat5_rx.0,
|
||||
};
|
||||
let ret2 = InsertQueuesRx {
|
||||
st_rf1_rx: st_rf1_rx.1,
|
||||
st_rf3_rx: st_rf3_rx.1,
|
||||
mt_rf3_rx: mt_rf3_rx.1,
|
||||
lt_rf3_rx: lt_rf3_rx.1,
|
||||
lt_rf3_lat5_rx: lt_rf3_lat5_rx.1,
|
||||
};
|
||||
(ret1, ret2)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InsertDeques {
|
||||
pub st_rf1_qu: VecDeque<QueryItem>,
|
||||
pub st_rf3_qu: VecDeque<QueryItem>,
|
||||
@@ -330,3 +389,23 @@ impl<'a> fmt::Display for InsertSenderPollingSummary<'a> {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InsertQueuesTxMetrics {
|
||||
pub st_rf1_len: usize,
|
||||
pub st_rf3_len: usize,
|
||||
pub mt_rf3_len: usize,
|
||||
pub lt_rf3_len: usize,
|
||||
pub lt_rf3_lat5_len: usize,
|
||||
}
|
||||
|
||||
impl From<&InsertQueuesTx> for InsertQueuesTxMetrics {
|
||||
fn from(value: &InsertQueuesTx) -> Self {
|
||||
Self {
|
||||
st_rf1_len: value.st_rf1_tx.len(),
|
||||
st_rf3_len: value.st_rf3_tx.len(),
|
||||
mt_rf3_len: value.mt_rf3_tx.len(),
|
||||
lt_rf3_len: value.lt_rf3_tx.len(),
|
||||
lt_rf3_lat5_len: value.lt_rf3_lat5_tx.len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user