Add rate limiter
This commit is contained in:
@@ -1250,6 +1250,7 @@ impl CaConn {
|
||||
do_wake_again = true;
|
||||
}
|
||||
CaMsgTy::EventAddRes(k) => {
|
||||
self.stats.caconn_recv_data_inc();
|
||||
let res = Self::handle_event_add_res(self, k, tsnow);
|
||||
let ts2 = Instant::now();
|
||||
self.stats
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::rt::JoinHandle;
|
||||
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
|
||||
use err::Error;
|
||||
use log::*;
|
||||
use netpod::timeunits::{MS, SEC};
|
||||
use netpod::ScyllaConfig;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
@@ -56,33 +57,45 @@ pub async fn spawn_scylla_insert_workers(
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
let stats = store_stats.clone();
|
||||
let recv = insert_item_queue.receiver();
|
||||
let mut ts_recv_last = Instant::now();
|
||||
let store_stats = store_stats.clone();
|
||||
let fut = async move {
|
||||
let mut ts_forward_last = Instant::now();
|
||||
let mut ivl_ema = stats::Ema64::with_k(0.00001);
|
||||
loop {
|
||||
let item = if let Ok(x) = recv.recv().await {
|
||||
x
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
let ts_received = Instant::now();
|
||||
let allowed_to_drop = match &item {
|
||||
QueryItem::Insert(_) => true,
|
||||
_ => false,
|
||||
};
|
||||
let dt_min = {
|
||||
let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire);
|
||||
Duration::from_nanos(1000000000 / rate)
|
||||
Duration::from_nanos(SEC / rate)
|
||||
};
|
||||
let tsnow = Instant::now();
|
||||
let dt = tsnow.duration_since(ts_recv_last);
|
||||
ts_recv_last = tsnow;
|
||||
if allowed_to_drop && dt < dt_min {
|
||||
let mut ema2 = ivl_ema.clone();
|
||||
{
|
||||
let dt = ts_received.duration_since(ts_forward_last);
|
||||
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
|
||||
ema2.update(dt_ns.min(MS * 100) as f32);
|
||||
}
|
||||
let ivl2 = Duration::from_nanos(ema2.ema() as u64);
|
||||
if allowed_to_drop && ivl2 < dt_min {
|
||||
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
|
||||
stats.store_worker_ratelimit_drop_inc();
|
||||
} else {
|
||||
if q2_tx.send(item).await.is_err() {
|
||||
break;
|
||||
} else {
|
||||
stats.store_worker_item_recv_inc();
|
||||
let tsnow = Instant::now();
|
||||
let dt = tsnow.duration_since(ts_forward_last);
|
||||
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
|
||||
ivl_ema.update(dt_ns.min(MS * 100) as f32);
|
||||
ts_forward_last = tsnow;
|
||||
store_stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,8 +113,11 @@ pub async fn spawn_scylla_insert_workers(
|
||||
for i1 in 0..insert_worker_count {
|
||||
let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone();
|
||||
let stats = store_stats.clone();
|
||||
//let recv = insert_item_queue.receiver();
|
||||
let recv = q2_rx.clone();
|
||||
let recv = if true {
|
||||
q2_rx.clone()
|
||||
} else {
|
||||
insert_item_queue.receiver()
|
||||
};
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
let fut = async move {
|
||||
let backoff_0 = Duration::from_millis(10);
|
||||
@@ -109,6 +125,7 @@ pub async fn spawn_scylla_insert_workers(
|
||||
let mut i1 = 0;
|
||||
loop {
|
||||
let item = if let Ok(item) = recv.recv().await {
|
||||
stats.store_worker_item_recv_inc();
|
||||
item
|
||||
} else {
|
||||
break;
|
||||
|
||||
@@ -6,15 +6,17 @@ const US: u64 = 1000;
|
||||
const MS: u64 = US * 1000;
|
||||
const SEC: u64 = MS * 1000;
|
||||
|
||||
pub type EMA = Ema32;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EMA {
|
||||
pub struct Ema32 {
|
||||
ema: f32,
|
||||
emv: f32,
|
||||
k: f32,
|
||||
update_count: u64,
|
||||
}
|
||||
|
||||
impl EMA {
|
||||
impl Ema32 {
|
||||
pub fn with_k(k: f32) -> Self {
|
||||
Self {
|
||||
ema: 0.0,
|
||||
@@ -71,6 +73,71 @@ impl EMA {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Ema64 {
|
||||
ema: f64,
|
||||
emv: f64,
|
||||
k: f64,
|
||||
update_count: u64,
|
||||
}
|
||||
|
||||
impl Ema64 {
|
||||
pub fn with_k(k: f64) -> Self {
|
||||
Self {
|
||||
ema: 0.0,
|
||||
emv: 0.0,
|
||||
k,
|
||||
update_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_ema(ema: f64, k: f64) -> Self {
|
||||
Self {
|
||||
ema,
|
||||
emv: 0.0,
|
||||
k,
|
||||
update_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default() -> Self {
|
||||
Self {
|
||||
ema: 0.0,
|
||||
emv: 0.0,
|
||||
k: 0.05,
|
||||
update_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn update<V>(&mut self, v: V)
|
||||
where
|
||||
V: Into<f64>,
|
||||
{
|
||||
self.update_count += 1;
|
||||
let k = self.k;
|
||||
let dv = v.into() - self.ema;
|
||||
self.ema += k * dv;
|
||||
self.emv = (1f64 - k) * (self.emv + k * dv * dv);
|
||||
}
|
||||
|
||||
pub fn update_count(&self) -> u64 {
|
||||
self.update_count
|
||||
}
|
||||
|
||||
pub fn ema(&self) -> f64 {
|
||||
self.ema
|
||||
}
|
||||
|
||||
pub fn emv(&self) -> f64 {
|
||||
self.emv
|
||||
}
|
||||
|
||||
pub fn k(&self) -> f64 {
|
||||
self.k
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CheckEvery {
|
||||
ts_last: Instant,
|
||||
dt: Duration,
|
||||
@@ -153,6 +220,7 @@ stats_proc::stats_struct!((
|
||||
inserts_queue_drop,
|
||||
channel_fast_item_drop,
|
||||
store_worker_recv_queue_len,
|
||||
// TODO maybe rename: this is now only the recv of the intermediate queue:
|
||||
store_worker_item_recv,
|
||||
// TODO rename to make clear that this drop is voluntary because of user config choice:
|
||||
store_worker_fraction_drop,
|
||||
@@ -173,6 +241,7 @@ stats_proc::stats_struct!((
|
||||
caconn_loop3_count,
|
||||
caconn_loop4_count,
|
||||
caconn_command_can_not_reply,
|
||||
caconn_recv_data,
|
||||
time_handle_conn_listen,
|
||||
time_handle_peer_ready,
|
||||
time_check_channels_state_init,
|
||||
@@ -189,6 +258,7 @@ stats_proc::stats_struct!((
|
||||
ca_ts_off_2,
|
||||
ca_ts_off_3,
|
||||
ca_ts_off_4,
|
||||
inter_ivl_ema,
|
||||
),
|
||||
),
|
||||
agg(name(CaConnStatsAgg), parent(CaConnStats)),
|
||||
|
||||
Reference in New Issue
Block a user