diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 74ec170..ea09b69 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -9,10 +9,10 @@ use self::store::DataStore; use crate::ca::conn::ConnCommand; use crate::ca::connset::CaConnSet; use crate::errconv::ErrConv; +use crate::insertworker::spawn_scylla_insert_workers; use crate::linuxhelper::local_hostname; use crate::metrics::metrics_agg_task; -use crate::rt::JoinHandle; -use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem}; +use crate::store::CommonInsertItemQueue; use err::Error; use futures_util::stream::FuturesUnordered; use futures_util::{FutureExt, StreamExt}; @@ -25,7 +25,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; use tokio_postgres::Client as PgClient; @@ -172,201 +172,6 @@ impl ExtraInsertsConf { } } -fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) { - use crate::store::Error; - match err { - Error::DbOverload => { - stats.store_worker_insert_overload_inc(); - } - Error::DbTimeout => { - stats.store_worker_insert_timeout_inc(); - } - Error::DbUnavailable => { - stats.store_worker_insert_unavailable_inc(); - } - Error::DbError(_) => { - stats.store_worker_insert_error_inc(); - } - } -} - -fn back_off_next(backoff_dt: &mut Duration) { - *backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2; - let dtmax = Duration::from_millis(4000); - if *backoff_dt > dtmax { - *backoff_dt = dtmax; - } -} - -async fn back_off_sleep(backoff_dt: &mut Duration) { - back_off_next(backoff_dt); - tokio::time::sleep(*backoff_dt).await; -} - -async fn spawn_scylla_insert_workers( - scyconf: ScyllaConfig, - insert_scylla_sessions: usize, - insert_worker_count: usize, - insert_item_queue: Arc, - insert_frac: Arc, - ingest_commons: Arc, - pg_client: Arc, - store_stats: Arc, -) -> Result>, Error> { - let mut jhs = Vec::new(); - let mut data_stores = Vec::new(); - for _ in 0..insert_scylla_sessions { - let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); - data_stores.push(data_store); - } - for i1 in 0..insert_worker_count { - let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone(); - let stats = store_stats.clone(); - let recv = insert_item_queue.receiver(); - let insert_frac = insert_frac.clone(); - let ingest_commons = ingest_commons.clone(); - let fut = async move { - let backoff_0 = Duration::from_millis(10); - let mut backoff = backoff_0.clone(); - let mut i1 = 0; - let mut ts_recv_last = Instant::now(); - loop { - let tsnow = Instant::now(); - let dt = tsnow.duration_since(ts_recv_last); - let dt_min = { - let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire); - Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate) - }; - if dt < dt_min { - tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; - } - let item = if let Ok(item) = recv.recv().await { - item - } else { - break; - }; - ts_recv_last = Instant::now(); - stats.store_worker_item_recv_inc(); - match item { - QueryItem::ConnectionStatus(item) => { - match crate::store::insert_connection_status(item, &data_store, &stats).await { - Ok(_) => { - stats.connection_status_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::ChannelStatus(item) => { - match crate::store::insert_channel_status(item, &data_store, &stats).await { - Ok(_) => { - stats.channel_status_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::Insert(item) => { - let insert_frac = insert_frac.load(Ordering::Acquire); - if i1 % 1000 < insert_frac { - match crate::store::insert_item(item, &data_store, &stats).await { - Ok(_) => { - stats.store_worker_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } else { - stats.store_worker_item_drop_inc(); - } - i1 += 1; - } - QueryItem::Mute(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ); - let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; - match qres { - Ok(_) => { - stats.mute_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - let e = e.into_simpler(); - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::Ivl(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ); - let qres = data_store - .scy - .execute(&data_store.qu_insert_item_recv_ivl, values) - .await; - match qres { - Ok(_) => { - stats.ivl_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - let e = e.into_simpler(); - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::ChannelInfo(item) => { - let params = ( - (item.series.id() & 0xff) as i32, - item.ts_msp as i32, - item.series.id() as i64, - item.ivl, - item.interest, - item.evsize as i32, - ); - let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; - match qres { - Ok(_) => { - stats.channel_info_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - let e = e.into_simpler(); - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - } - } - trace!("insert worker has no more messages"); - }; - let jh = tokio::spawn(fut); - jhs.push(jh); - } - Ok(jhs) -} - pub struct IngestCommons { pub pgconf: Arc, pub local_epics_hostname: String, diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs new file mode 100644 index 0000000..aef704e --- /dev/null +++ b/netfetch/src/insertworker.rs @@ -0,0 +1,206 @@ +use crate::ca::store::DataStore; +use crate::ca::IngestCommons; +use crate::rt::JoinHandle; +use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem}; +use err::Error; +use log::trace; +use netpod::ScyllaConfig; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio_postgres::Client as PgClient; + +fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) { + use crate::store::Error; + match err { + Error::DbOverload => { + stats.store_worker_insert_overload_inc(); + } + Error::DbTimeout => { + stats.store_worker_insert_timeout_inc(); + } + Error::DbUnavailable => { + stats.store_worker_insert_unavailable_inc(); + } + Error::DbError(_) => { + stats.store_worker_insert_error_inc(); + } + } +} + +fn back_off_next(backoff_dt: &mut Duration) { + *backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2; + let dtmax = Duration::from_millis(4000); + if *backoff_dt > dtmax { + *backoff_dt = dtmax; + } +} + +async fn back_off_sleep(backoff_dt: &mut Duration) { + back_off_next(backoff_dt); + tokio::time::sleep(*backoff_dt).await; +} + +pub async fn spawn_scylla_insert_workers( + scyconf: ScyllaConfig, + insert_scylla_sessions: usize, + insert_worker_count: usize, + insert_item_queue: Arc, + insert_frac: Arc, + ingest_commons: Arc, + pg_client: Arc, + store_stats: Arc, +) -> Result>, Error> { + let mut jhs = Vec::new(); + let mut data_stores = Vec::new(); + for _ in 0..insert_scylla_sessions { + let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); + data_stores.push(data_store); + } + for i1 in 0..insert_worker_count { + let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone(); + let stats = store_stats.clone(); + let recv = insert_item_queue.receiver(); + let insert_frac = insert_frac.clone(); + let ingest_commons = ingest_commons.clone(); + let fut = async move { + let backoff_0 = Duration::from_millis(10); + let mut backoff = backoff_0.clone(); + let mut i1 = 0; + let mut ts_recv_last = Instant::now(); + loop { + let tsnow = Instant::now(); + let dt = tsnow.duration_since(ts_recv_last); + let dt_min = { + let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire); + Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate) + }; + if dt < dt_min { + tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; + } + let item = if let Ok(item) = recv.recv().await { + item + } else { + break; + }; + ts_recv_last = Instant::now(); + stats.store_worker_item_recv_inc(); + match item { + QueryItem::ConnectionStatus(item) => { + match crate::store::insert_connection_status(item, &data_store, &stats).await { + Ok(_) => { + stats.connection_status_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::ChannelStatus(item) => { + match crate::store::insert_channel_status(item, &data_store, &stats).await { + Ok(_) => { + stats.channel_status_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::Insert(item) => { + let insert_frac = insert_frac.load(Ordering::Acquire); + if i1 % 1000 < insert_frac { + match crate::store::insert_item(item, &data_store, &stats).await { + Ok(_) => { + stats.store_worker_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } else { + stats.store_worker_item_drop_inc(); + } + i1 += 1; + } + QueryItem::Mute(item) => { + let values = ( + (item.series.id() & 0xff) as i32, + item.series.id() as i64, + item.ts as i64, + item.ema, + item.emd, + ); + let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; + match qres { + Ok(_) => { + stats.mute_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + let e = e.into_simpler(); + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::Ivl(item) => { + let values = ( + (item.series.id() & 0xff) as i32, + item.series.id() as i64, + item.ts as i64, + item.ema, + item.emd, + ); + let qres = data_store + .scy + .execute(&data_store.qu_insert_item_recv_ivl, values) + .await; + match qres { + Ok(_) => { + stats.ivl_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + let e = e.into_simpler(); + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::ChannelInfo(item) => { + let params = ( + (item.series.id() & 0xff) as i32, + item.ts_msp as i32, + item.series.id() as i64, + item.ivl, + item.interest, + item.evsize as i32, + ); + let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; + match qres { + Ok(_) => { + stats.channel_info_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + let e = e.into_simpler(); + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + } + } + trace!("insert worker has no more messages"); + }; + let jh = tokio::spawn(fut); + jhs.push(jh); + } + Ok(jhs) +} diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index d883690..ed285d3 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -2,6 +2,7 @@ pub mod bsread; pub mod ca; pub mod channelwriter; pub mod errconv; +pub mod insertworker; pub mod linuxhelper; pub mod metrics; pub mod netbuf; diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 8869aa1..778ac6b 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -66,12 +66,12 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res for _ in 0..200 { h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); let f = h.clone().finalize(); - let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); if series > i64::MAX as u64 { - series &= 0x7fffffffffffffff; + continue; } if series == 0 { - series = 1; + continue; } if series <= 0 || series > i64::MAX as u64 { return Err(Error::with_msg_no_trace(format!(