Factor out inserter
This commit is contained in:
@@ -9,10 +9,10 @@ use self::store::DataStore;
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::ca::connset::CaConnSet;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::insertworker::spawn_scylla_insert_workers;
|
||||
use crate::linuxhelper::local_hostname;
|
||||
use crate::metrics::metrics_agg_task;
|
||||
use crate::rt::JoinHandle;
|
||||
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
|
||||
use crate::store::CommonInsertItemQueue;
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
@@ -25,7 +25,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
@@ -172,201 +172,6 @@ impl ExtraInsertsConf {
|
||||
}
|
||||
}
|
||||
|
||||
fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) {
|
||||
use crate::store::Error;
|
||||
match err {
|
||||
Error::DbOverload => {
|
||||
stats.store_worker_insert_overload_inc();
|
||||
}
|
||||
Error::DbTimeout => {
|
||||
stats.store_worker_insert_timeout_inc();
|
||||
}
|
||||
Error::DbUnavailable => {
|
||||
stats.store_worker_insert_unavailable_inc();
|
||||
}
|
||||
Error::DbError(_) => {
|
||||
stats.store_worker_insert_error_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn back_off_next(backoff_dt: &mut Duration) {
|
||||
*backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2;
|
||||
let dtmax = Duration::from_millis(4000);
|
||||
if *backoff_dt > dtmax {
|
||||
*backoff_dt = dtmax;
|
||||
}
|
||||
}
|
||||
|
||||
async fn back_off_sleep(backoff_dt: &mut Duration) {
|
||||
back_off_next(backoff_dt);
|
||||
tokio::time::sleep(*backoff_dt).await;
|
||||
}
|
||||
|
||||
async fn spawn_scylla_insert_workers(
|
||||
scyconf: ScyllaConfig,
|
||||
insert_scylla_sessions: usize,
|
||||
insert_worker_count: usize,
|
||||
insert_item_queue: Arc<CommonInsertItemQueue>,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
pg_client: Arc<PgClient>,
|
||||
store_stats: Arc<stats::CaConnStats>,
|
||||
) -> Result<Vec<JoinHandle<()>>, Error> {
|
||||
let mut jhs = Vec::new();
|
||||
let mut data_stores = Vec::new();
|
||||
for _ in 0..insert_scylla_sessions {
|
||||
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
|
||||
data_stores.push(data_store);
|
||||
}
|
||||
for i1 in 0..insert_worker_count {
|
||||
let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone();
|
||||
let stats = store_stats.clone();
|
||||
let recv = insert_item_queue.receiver();
|
||||
let insert_frac = insert_frac.clone();
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
let fut = async move {
|
||||
let backoff_0 = Duration::from_millis(10);
|
||||
let mut backoff = backoff_0.clone();
|
||||
let mut i1 = 0;
|
||||
let mut ts_recv_last = Instant::now();
|
||||
loop {
|
||||
let tsnow = Instant::now();
|
||||
let dt = tsnow.duration_since(ts_recv_last);
|
||||
let dt_min = {
|
||||
let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire);
|
||||
Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate)
|
||||
};
|
||||
if dt < dt_min {
|
||||
tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
|
||||
}
|
||||
let item = if let Ok(item) = recv.recv().await {
|
||||
item
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
ts_recv_last = Instant::now();
|
||||
stats.store_worker_item_recv_inc();
|
||||
match item {
|
||||
QueryItem::ConnectionStatus(item) => {
|
||||
match crate::store::insert_connection_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.connection_status_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelStatus(item) => {
|
||||
match crate::store::insert_channel_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.channel_status_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::Insert(item) => {
|
||||
let insert_frac = insert_frac.load(Ordering::Acquire);
|
||||
if i1 % 1000 < insert_frac {
|
||||
match crate::store::insert_item(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
stats.store_worker_item_drop_inc();
|
||||
}
|
||||
i1 += 1;
|
||||
}
|
||||
QueryItem::Mute(item) => {
|
||||
let values = (
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.series.id() as i64,
|
||||
item.ts as i64,
|
||||
item.ema,
|
||||
item.emd,
|
||||
);
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.mute_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::Ivl(item) => {
|
||||
let values = (
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.series.id() as i64,
|
||||
item.ts as i64,
|
||||
item.ema,
|
||||
item.emd,
|
||||
);
|
||||
let qres = data_store
|
||||
.scy
|
||||
.execute(&data_store.qu_insert_item_recv_ivl, values)
|
||||
.await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.ivl_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelInfo(item) => {
|
||||
let params = (
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.ts_msp as i32,
|
||||
item.series.id() as i64,
|
||||
item.ivl,
|
||||
item.interest,
|
||||
item.evsize as i32,
|
||||
);
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.channel_info_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("insert worker has no more messages");
|
||||
};
|
||||
let jh = tokio::spawn(fut);
|
||||
jhs.push(jh);
|
||||
}
|
||||
Ok(jhs)
|
||||
}
|
||||
|
||||
pub struct IngestCommons {
|
||||
pub pgconf: Arc<Database>,
|
||||
pub local_epics_hostname: String,
|
||||
|
||||
206
netfetch/src/insertworker.rs
Normal file
206
netfetch/src/insertworker.rs
Normal file
@@ -0,0 +1,206 @@
|
||||
use crate::ca::store::DataStore;
|
||||
use crate::ca::IngestCommons;
|
||||
use crate::rt::JoinHandle;
|
||||
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
|
||||
use err::Error;
|
||||
use log::trace;
|
||||
use netpod::ScyllaConfig;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) {
|
||||
use crate::store::Error;
|
||||
match err {
|
||||
Error::DbOverload => {
|
||||
stats.store_worker_insert_overload_inc();
|
||||
}
|
||||
Error::DbTimeout => {
|
||||
stats.store_worker_insert_timeout_inc();
|
||||
}
|
||||
Error::DbUnavailable => {
|
||||
stats.store_worker_insert_unavailable_inc();
|
||||
}
|
||||
Error::DbError(_) => {
|
||||
stats.store_worker_insert_error_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn back_off_next(backoff_dt: &mut Duration) {
|
||||
*backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2;
|
||||
let dtmax = Duration::from_millis(4000);
|
||||
if *backoff_dt > dtmax {
|
||||
*backoff_dt = dtmax;
|
||||
}
|
||||
}
|
||||
|
||||
async fn back_off_sleep(backoff_dt: &mut Duration) {
|
||||
back_off_next(backoff_dt);
|
||||
tokio::time::sleep(*backoff_dt).await;
|
||||
}
|
||||
|
||||
pub async fn spawn_scylla_insert_workers(
|
||||
scyconf: ScyllaConfig,
|
||||
insert_scylla_sessions: usize,
|
||||
insert_worker_count: usize,
|
||||
insert_item_queue: Arc<CommonInsertItemQueue>,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
pg_client: Arc<PgClient>,
|
||||
store_stats: Arc<stats::CaConnStats>,
|
||||
) -> Result<Vec<JoinHandle<()>>, Error> {
|
||||
let mut jhs = Vec::new();
|
||||
let mut data_stores = Vec::new();
|
||||
for _ in 0..insert_scylla_sessions {
|
||||
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
|
||||
data_stores.push(data_store);
|
||||
}
|
||||
for i1 in 0..insert_worker_count {
|
||||
let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone();
|
||||
let stats = store_stats.clone();
|
||||
let recv = insert_item_queue.receiver();
|
||||
let insert_frac = insert_frac.clone();
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
let fut = async move {
|
||||
let backoff_0 = Duration::from_millis(10);
|
||||
let mut backoff = backoff_0.clone();
|
||||
let mut i1 = 0;
|
||||
let mut ts_recv_last = Instant::now();
|
||||
loop {
|
||||
let tsnow = Instant::now();
|
||||
let dt = tsnow.duration_since(ts_recv_last);
|
||||
let dt_min = {
|
||||
let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire);
|
||||
Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate)
|
||||
};
|
||||
if dt < dt_min {
|
||||
tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
|
||||
}
|
||||
let item = if let Ok(item) = recv.recv().await {
|
||||
item
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
ts_recv_last = Instant::now();
|
||||
stats.store_worker_item_recv_inc();
|
||||
match item {
|
||||
QueryItem::ConnectionStatus(item) => {
|
||||
match crate::store::insert_connection_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.connection_status_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelStatus(item) => {
|
||||
match crate::store::insert_channel_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.channel_status_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::Insert(item) => {
|
||||
let insert_frac = insert_frac.load(Ordering::Acquire);
|
||||
if i1 % 1000 < insert_frac {
|
||||
match crate::store::insert_item(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
stats.store_worker_item_drop_inc();
|
||||
}
|
||||
i1 += 1;
|
||||
}
|
||||
QueryItem::Mute(item) => {
|
||||
let values = (
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.series.id() as i64,
|
||||
item.ts as i64,
|
||||
item.ema,
|
||||
item.emd,
|
||||
);
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.mute_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::Ivl(item) => {
|
||||
let values = (
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.series.id() as i64,
|
||||
item.ts as i64,
|
||||
item.ema,
|
||||
item.emd,
|
||||
);
|
||||
let qres = data_store
|
||||
.scy
|
||||
.execute(&data_store.qu_insert_item_recv_ivl, values)
|
||||
.await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.ivl_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelInfo(item) => {
|
||||
let params = (
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.ts_msp as i32,
|
||||
item.series.id() as i64,
|
||||
item.ivl,
|
||||
item.interest,
|
||||
item.evsize as i32,
|
||||
);
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.channel_info_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("insert worker has no more messages");
|
||||
};
|
||||
let jh = tokio::spawn(fut);
|
||||
jhs.push(jh);
|
||||
}
|
||||
Ok(jhs)
|
||||
}
|
||||
@@ -2,6 +2,7 @@ pub mod bsread;
|
||||
pub mod ca;
|
||||
pub mod channelwriter;
|
||||
pub mod errconv;
|
||||
pub mod insertworker;
|
||||
pub mod linuxhelper;
|
||||
pub mod metrics;
|
||||
pub mod netbuf;
|
||||
|
||||
@@ -66,12 +66,12 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res
|
||||
for _ in 0..200 {
|
||||
h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
|
||||
let f = h.clone().finalize();
|
||||
let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
|
||||
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
|
||||
if series > i64::MAX as u64 {
|
||||
series &= 0x7fffffffffffffff;
|
||||
continue;
|
||||
}
|
||||
if series == 0 {
|
||||
series = 1;
|
||||
continue;
|
||||
}
|
||||
if series <= 0 || series > i64::MAX as u64 {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
|
||||
Reference in New Issue
Block a user