diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index ea09b69..94bfdf3 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -12,6 +12,7 @@ use crate::errconv::ErrConv; use crate::insertworker::spawn_scylla_insert_workers; use crate::linuxhelper::local_hostname; use crate::metrics::metrics_agg_task; +use crate::rt::TokMx; use crate::store::CommonInsertItemQueue; use err::Error; use futures_util::stream::FuturesUnordered; @@ -61,6 +62,7 @@ struct ChannelConfig { api_bind: Option, local_epics_hostname: Option, store_workers_rate: Option, + insert_frac: Option, } #[test] @@ -138,6 +140,7 @@ pub async fn parse_config(config: PathBuf) -> Result { api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()), local_epics_hostname: conf.local_epics_hostname.unwrap_or_else(local_hostname), store_workers_rate: conf.store_workers_rate.unwrap_or(10000), + insert_frac: conf.insert_frac.unwrap_or(1000), }) } @@ -159,6 +162,7 @@ pub struct CaConnectOpts { pub api_bind: String, pub local_epics_hostname: String, pub store_workers_rate: u64, + pub insert_frac: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -177,9 +181,9 @@ pub struct IngestCommons { pub local_epics_hostname: String, pub insert_item_queue: Arc, pub data_store: Arc, - pub insert_frac: Arc, pub insert_ivl_min: Arc, - pub extra_inserts_conf: Mutex, + pub extra_inserts_conf: TokMx, + pub insert_frac: AtomicU64, pub store_workers_rate: AtomicU64, pub ca_conn_set: CaConnSet, } @@ -277,8 +281,7 @@ async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> { pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { crate::linuxhelper::set_signal_handler()?; - let insert_frac = Arc::new(AtomicU64::new(1000)); - let extra_inserts_conf = Mutex::new(ExtraInsertsConf { copies: Vec::new() }); + let extra_inserts_conf = TokMx::new(ExtraInsertsConf { copies: Vec::new() }); let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; let scyconf = opts.scyconf.clone(); @@ -332,9 +335,9 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { insert_item_queue: insert_item_queue.clone(), data_store: data_store.clone(), insert_ivl_min: insert_ivl_min.clone(), - insert_frac: insert_frac.clone(), extra_inserts_conf, store_workers_rate: AtomicU64::new(opts.store_workers_rate), + insert_frac: AtomicU64::new(opts.insert_frac), ca_conn_set: CaConnSet::new(), }; let ingest_commons = Arc::new(ingest_commons); @@ -346,7 +349,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { opts.insert_scylla_sessions, opts.insert_worker_count, insert_item_queue.clone(), - insert_frac.clone(), ingest_commons.clone(), pg_client.clone(), store_stats.clone(), diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index aef704e..1025720 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -3,9 +3,9 @@ use crate::ca::IngestCommons; use crate::rt::JoinHandle; use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem}; use err::Error; -use log::trace; +use log::*; use netpod::ScyllaConfig; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio_postgres::Client as PgClient; @@ -22,7 +22,8 @@ fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) { Error::DbUnavailable => { stats.store_worker_insert_unavailable_inc(); } - Error::DbError(_) => { + Error::DbError(e) => { + warn!("db error {e}"); stats.store_worker_insert_error_inc(); } } @@ -46,11 +47,50 @@ pub async fn spawn_scylla_insert_workers( insert_scylla_sessions: usize, insert_worker_count: usize, insert_item_queue: Arc, - insert_frac: Arc, ingest_commons: Arc, pg_client: Arc, store_stats: Arc, ) -> Result>, Error> { + let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000)); + { + let ingest_commons = ingest_commons.clone(); + let stats = store_stats.clone(); + let recv = insert_item_queue.receiver(); + let mut ts_recv_last = Instant::now(); + let fut = async move { + loop { + let item = if let Ok(x) = recv.recv().await { + x + } else { + break; + }; + let allowed_to_drop = match &item { + QueryItem::Insert(_) => true, + _ => false, + }; + let dt_min = { + let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire); + Duration::from_nanos(1000000000 / rate) + }; + let tsnow = Instant::now(); + let dt = tsnow.duration_since(ts_recv_last); + ts_recv_last = tsnow; + if allowed_to_drop && dt < dt_min { + //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; + stats.store_worker_ratelimit_drop_inc(); + } else { + if q2_tx.send(item).await.is_err() { + break; + } else { + stats.store_worker_item_recv_inc(); + } + } + } + info!("intermediate queue done"); + }; + tokio::spawn(fut); + } + let mut jhs = Vec::new(); let mut data_stores = Vec::new(); for _ in 0..insert_scylla_sessions { @@ -60,31 +100,19 @@ pub async fn spawn_scylla_insert_workers( for i1 in 0..insert_worker_count { let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone(); let stats = store_stats.clone(); - let recv = insert_item_queue.receiver(); - let insert_frac = insert_frac.clone(); + //let recv = insert_item_queue.receiver(); + let recv = q2_rx.clone(); let ingest_commons = ingest_commons.clone(); let fut = async move { let backoff_0 = Duration::from_millis(10); let mut backoff = backoff_0.clone(); let mut i1 = 0; - let mut ts_recv_last = Instant::now(); loop { - let tsnow = Instant::now(); - let dt = tsnow.duration_since(ts_recv_last); - let dt_min = { - let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire); - Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate) - }; - if dt < dt_min { - tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; - } let item = if let Ok(item) = recv.recv().await { item } else { break; }; - ts_recv_last = Instant::now(); - stats.store_worker_item_recv_inc(); match item { QueryItem::ConnectionStatus(item) => { match crate::store::insert_connection_status(item, &data_store, &stats).await { @@ -111,7 +139,7 @@ pub async fn spawn_scylla_insert_workers( } } QueryItem::Insert(item) => { - let insert_frac = insert_frac.load(Ordering::Acquire); + let insert_frac = ingest_commons.insert_frac.load(Ordering::Acquire); if i1 % 1000 < insert_frac { match crate::store::insert_item(item, &data_store, &stats).await { Ok(_) => { @@ -124,7 +152,7 @@ pub async fn spawn_scylla_insert_workers( } } } else { - stats.store_worker_item_drop_inc(); + stats.store_worker_fraction_drop_inc(); } i1 += 1; } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 2db0729..b20e79a 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -11,10 +11,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -async fn get_empty() -> String { - String::new() -} - async fn find_channel( params: HashMap, ingest_commons: Arc, @@ -141,7 +137,7 @@ async fn channel_states( async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc) -> axum::Json { // TODO ingest_commons is the authorative value. Should have common function outside of this metrics which // can update everything to a given value. - *ingest_commons.extra_inserts_conf.lock().unwrap() = v.clone(); + *ingest_commons.extra_inserts_conf.lock().await = v.clone(); ingest_commons .ca_conn_set .send_command_to_all(|| ConnCommand::extra_inserts_conf_set(v.clone())) @@ -206,14 +202,45 @@ pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc| async move { - insert_frac.store(v.0, Ordering::Release); + c.store_workers_rate.store(v.0, Ordering::Release); } }), ) + .route( + "/insert_frac", + get({ + let c = ingest_commons.clone(); + || async move { axum::Json(c.insert_frac.load(Ordering::Acquire)) } + }) + .put({ + let c = ingest_commons.clone(); + |v: extract::Json| async move { + c.insert_frac.store(v.0, Ordering::Release); + } + }), + ) + .route( + "/extra_inserts_conf", + get({ + let c = ingest_commons.clone(); + || async move { + let res = c.extra_inserts_conf.lock().await; + axum::Json(serde_json::to_value(&*res).unwrap()) + } + }) + .put({ + let ingest_commons = ingest_commons.clone(); + |v: extract::Json| extra_inserts_conf_set(v.0, ingest_commons) + }), + ) .route( "/insert_ivl_min", put({ @@ -223,13 +250,6 @@ pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc| extra_inserts_conf_set(v.0, ingest_commons) - }), - ) .fallback( get(|parts: Parts, body: extract::RawBody| async move { let bytes = hyper::body::to_bytes(body.0).await.unwrap(); diff --git a/stats/src/stats.rs b/stats/src/stats.rs index e42bc7f..b91d0ae 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -155,7 +155,8 @@ stats_proc::stats_struct!(( store_worker_recv_queue_len, store_worker_item_recv, // TODO rename to make clear that this drop is voluntary because of user config choice: - store_worker_item_drop, + store_worker_fraction_drop, + store_worker_ratelimit_drop, store_worker_insert_done, store_worker_insert_overload, store_worker_insert_timeout,