Improve rate limit

This commit is contained in:
Dominik Werder
2022-09-30 16:17:45 +02:00
parent 21f5d60f61
commit b4912b538d
4 changed files with 94 additions and 43 deletions

View File

@@ -3,9 +3,9 @@ use crate::ca::IngestCommons;
use crate::rt::JoinHandle;
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
use err::Error;
use log::trace;
use log::*;
use netpod::ScyllaConfig;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_postgres::Client as PgClient;
@@ -22,7 +22,8 @@ fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) {
Error::DbUnavailable => {
stats.store_worker_insert_unavailable_inc();
}
Error::DbError(_) => {
Error::DbError(e) => {
warn!("db error {e}");
stats.store_worker_insert_error_inc();
}
}
@@ -46,11 +47,50 @@ pub async fn spawn_scylla_insert_workers(
insert_scylla_sessions: usize,
insert_worker_count: usize,
insert_item_queue: Arc<CommonInsertItemQueue>,
insert_frac: Arc<AtomicU64>,
ingest_commons: Arc<IngestCommons>,
pg_client: Arc<PgClient>,
store_stats: Arc<stats::CaConnStats>,
) -> Result<Vec<JoinHandle<()>>, Error> {
let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000));
{
let ingest_commons = ingest_commons.clone();
let stats = store_stats.clone();
let recv = insert_item_queue.receiver();
let mut ts_recv_last = Instant::now();
let fut = async move {
loop {
let item = if let Ok(x) = recv.recv().await {
x
} else {
break;
};
let allowed_to_drop = match &item {
QueryItem::Insert(_) => true,
_ => false,
};
let dt_min = {
let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire);
Duration::from_nanos(1000000000 / rate)
};
let tsnow = Instant::now();
let dt = tsnow.duration_since(ts_recv_last);
ts_recv_last = tsnow;
if allowed_to_drop && dt < dt_min {
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
stats.store_worker_ratelimit_drop_inc();
} else {
if q2_tx.send(item).await.is_err() {
break;
} else {
stats.store_worker_item_recv_inc();
}
}
}
info!("intermediate queue done");
};
tokio::spawn(fut);
}
let mut jhs = Vec::new();
let mut data_stores = Vec::new();
for _ in 0..insert_scylla_sessions {
@@ -60,31 +100,19 @@ pub async fn spawn_scylla_insert_workers(
for i1 in 0..insert_worker_count {
let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone();
let stats = store_stats.clone();
let recv = insert_item_queue.receiver();
let insert_frac = insert_frac.clone();
//let recv = insert_item_queue.receiver();
let recv = q2_rx.clone();
let ingest_commons = ingest_commons.clone();
let fut = async move {
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();
let mut i1 = 0;
let mut ts_recv_last = Instant::now();
loop {
let tsnow = Instant::now();
let dt = tsnow.duration_since(ts_recv_last);
let dt_min = {
let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire);
Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate)
};
if dt < dt_min {
tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
}
let item = if let Ok(item) = recv.recv().await {
item
} else {
break;
};
ts_recv_last = Instant::now();
stats.store_worker_item_recv_inc();
match item {
QueryItem::ConnectionStatus(item) => {
match crate::store::insert_connection_status(item, &data_store, &stats).await {
@@ -111,7 +139,7 @@ pub async fn spawn_scylla_insert_workers(
}
}
QueryItem::Insert(item) => {
let insert_frac = insert_frac.load(Ordering::Acquire);
let insert_frac = ingest_commons.insert_frac.load(Ordering::Acquire);
if i1 % 1000 < insert_frac {
match crate::store::insert_item(item, &data_store, &stats).await {
Ok(_) => {
@@ -124,7 +152,7 @@ pub async fn spawn_scylla_insert_workers(
}
}
} else {
stats.store_worker_item_drop_inc();
stats.store_worker_fraction_drop_inc();
}
i1 += 1;
}