Alternative insert queue worker

This commit is contained in:
Dominik Werder
2023-09-19 22:27:22 +02:00
parent 3792010724
commit 5fa77acf5c
10 changed files with 353 additions and 263 deletions

View File

@@ -619,6 +619,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
daemon_stats,
conn_set_stats,
ca_conn_stats,
daemon.connset_ctrl.ca_proto_stats().clone(),
daemon.insert_worker_stats.clone(),
daemon.series_by_channel_stats.clone(),
insert_frac,

View File

@@ -40,6 +40,7 @@ use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IntervalEma;
use std::collections::BTreeMap;
use std::collections::VecDeque;
@@ -502,8 +503,10 @@ pub struct CaConn {
channel_info_query_sending: SenderPolling<ChannelInfoQuery>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
}
#[cfg(DISABLED)]
impl Drop for CaConn {
fn drop(&mut self) {
debug!("~~~~~~~~~~~~~~~ Drop CaConn {}", self.remote_addr_dbg);
@@ -519,6 +522,7 @@ impl CaConn {
storage_insert_tx: Sender<QueryItem>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
) -> Self {
let (cq_tx, cq_rx) = async_channel::bounded(32);
Self {
@@ -554,6 +558,7 @@ impl CaConn {
channel_info_query_sending: SenderPolling::new(channel_info_query_tx),
time_binners: BTreeMap::new(),
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
ca_proto_stats,
}
}
@@ -1614,7 +1619,12 @@ impl CaConn {
status: ConnectionStatus::Established,
}));
self.backoff_reset();
let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.opts.array_truncate);
let proto = CaProto::new(
tcp,
self.remote_addr_dbg.clone(),
self.opts.array_truncate,
self.ca_proto_stats.clone(),
);
self.state = CaConnState::Init;
self.proto = Some(proto);
Ok(Ready(Some(())))

View File

@@ -47,6 +47,7 @@ use statemap::WithStatusSeriesIdStateInner;
use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaProtoStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
@@ -214,6 +215,7 @@ pub struct CaConnSetCtrl {
rx: Receiver<CaConnSetItem>,
stats: Arc<CaConnSetStats>,
ca_conn_stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
jh: JoinHandle<Result<(), Error>>,
}
@@ -268,6 +270,10 @@ impl CaConnSetCtrl {
pub fn ca_conn_stats(&self) -> &Arc<CaConnStats> {
&self.ca_conn_stats
}
pub fn ca_proto_stats(&self) -> &Arc<CaProtoStats> {
&self.ca_proto_stats
}
}
#[derive(Debug)]
@@ -323,6 +329,7 @@ pub struct CaConnSet {
thr_msg_poll_1: ThrottleTrace,
thr_msg_storage_len: ThrottleTrace,
did_connset_out_queue: bool,
ca_proto_stats: Arc<CaProtoStats>,
}
impl CaConnSet {
@@ -341,10 +348,8 @@ impl CaConnSet {
super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf);
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400);
let stats = Arc::new(CaConnSetStats::new());
let ca_proto_stats = Arc::new(CaProtoStats::new());
let ca_conn_stats = Arc::new(CaConnStats::new());
stats.test_1().inc();
stats.test_1().inc();
stats.test_1().inc();
let connset = Self {
backend,
local_epics_hostname,
@@ -378,6 +383,7 @@ impl CaConnSet {
thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)),
thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)),
did_connset_out_queue: false,
ca_proto_stats: ca_proto_stats.clone(),
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -386,6 +392,7 @@ impl CaConnSet {
rx: connset_out_rx,
stats,
ca_conn_stats,
ca_proto_stats,
jh,
}
}
@@ -766,6 +773,7 @@ impl CaConnSet {
self.storage_insert_tx.clone(),
self.channel_info_query_tx.clone(),
self.ca_conn_stats.clone(),
self.ca_proto_stats.clone(),
);
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
@@ -817,6 +825,7 @@ impl CaConnSet {
Err(e) => {
error!("CaConn gives error: {e:?}");
ret = Err(e);
break;
}
}
}

View File

@@ -6,6 +6,7 @@ use futures_util::Stream;
use log::*;
use netpod::timeunits::*;
use slidebuf::SlideBuf;
use stats::CaProtoStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::io;
@@ -13,6 +14,7 @@ use std::net::SocketAddrV4;
use std::num::NonZeroU16;
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use taskrun::tokio;
@@ -948,19 +950,21 @@ pub struct CaProto {
out: VecDeque<CaMsg>,
array_truncate: usize,
logged_proto_error_for_cid: BTreeMap<u32, bool>,
stats: Arc<CaProtoStats>,
}
impl CaProto {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize) -> Self {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize, stats: Arc<CaProtoStats>) -> Self {
Self {
tcp,
remote_addr_dbg,
state: CaState::StdHead,
buf: SlideBuf::new(1024 * 128),
buf: SlideBuf::new(1024 * 512),
outbuf: SlideBuf::new(1024 * 128),
out: VecDeque::new(),
array_truncate,
logged_proto_error_for_cid: BTreeMap::new(),
stats,
}
}
@@ -1083,7 +1087,11 @@ impl CaProto {
info!("received data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wadv(nf) {
Ok(()) => Ok(Some(Ready(CaItem::empty()))),
Ok(()) => {
self.stats.tcp_recv_bytes().add(nf as _);
self.stats.tcp_recv_count().inc();
Ok(Some(Ready(CaItem::empty())))
}
Err(e) => {
error!("netbuf wadv fail nf {nf}");
Err(e.into())

View File

@@ -18,6 +18,7 @@ use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaConnStatsAgg;
use stats::CaConnStatsAggDiff;
use stats::CaProtoStats;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
@@ -33,6 +34,7 @@ pub struct StatsSet {
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
ca_conn: Arc<CaConnStats>,
ca_proto: Arc<CaProtoStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
insert_frac: Arc<AtomicU64>,
@@ -43,6 +45,7 @@ impl StatsSet {
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
ca_conn: Arc<CaConnStats>,
ca_proto: Arc<CaProtoStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
insert_frac: Arc<AtomicU64>,
@@ -51,6 +54,7 @@ impl StatsSet {
daemon,
ca_conn_set,
ca_conn,
ca_proto,
insert_worker_stats,
series_by_channel_stats,
insert_frac,
@@ -215,11 +219,8 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
let s3 = stats_set.insert_worker_stats.prometheus();
let s4 = stats_set.ca_conn.prometheus();
let s5 = stats_set.series_by_channel_stats.prometheus();
s1.push_str(&s2);
s1.push_str(&s3);
s1.push_str(&s4);
s1.push_str(&s5);
s1
let s6 = stats_set.ca_proto.prometheus();
[s1, s2, s3, s4, s5, s6].join("")
}
}),
)

View File

@@ -1,9 +1,13 @@
use crate::iteminsertqueue::insert_channel_status;
use crate::iteminsertqueue::insert_channel_status_fut;
use crate::iteminsertqueue::insert_connection_status;
use crate::iteminsertqueue::insert_connection_status_fut;
use crate::iteminsertqueue::insert_item;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::iteminsertqueue::ConnectionStatusItem;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::QueryItem;
use crate::store::DataStore;
use async_channel::Receiver;
@@ -15,6 +19,8 @@ use log::*;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::ScyllaConfig;
use smallvec::smallvec;
use smallvec::SmallVec;
use stats::InsertWorkerStats;
use std::pin::Pin;
use std::sync::atomic;
@@ -88,65 +94,6 @@ pub struct InsertWorkerOpts {
pub array_truncate: Arc<AtomicU64>,
}
async fn rate_limiter_worker(
rate: Arc<AtomicU64>,
inp: Receiver<QueryItem>,
tx: Sender<QueryItem>,
stats: Arc<stats::InsertWorkerStats>,
) {
let mut ts_forward_last = Instant::now();
let mut ivl_ema = stats::Ema64::with_k(0.00001);
loop {
let item = if let Ok(x) = inp.recv().await {
x
} else {
break;
};
let ts_received = Instant::now();
let allowed_to_drop = match &item {
QueryItem::Insert(_) => true,
_ => false,
};
let dt_min = {
let rate2 = rate.load(Ordering::Acquire);
Duration::from_nanos(SEC / rate2)
};
let mut ema2 = ivl_ema.clone();
{
let dt = ts_received.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ema2.update(dt_ns.min(MS * 100) as f32);
}
let ivl2 = Duration::from_nanos(ema2.ema() as u64);
if allowed_to_drop && ivl2 < dt_min {
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
stats.ratelimit_drop().inc();
} else {
if tx.send(item).await.is_err() {
break;
} else {
let tsnow = Instant::now();
let dt = tsnow.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ivl_ema.update(dt_ns.min(MS * 100) as f32);
ts_forward_last = tsnow;
// stats.inter_ivl_ema.set(ivl_ema.ema() as u64);
}
}
}
info!("rate limiter done");
}
fn rate_limiter(
inp: Receiver<QueryItem>,
opts: Arc<InsertWorkerOpts>,
stats: Arc<stats::InsertWorkerStats>,
) -> Receiver<QueryItem> {
let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256));
tokio::spawn(rate_limiter_worker(opts.store_workers_rate.clone(), inp, tx, stats));
rx
}
pub async fn spawn_scylla_insert_workers(
scyconf: ScyllaConfig,
insert_scylla_sessions: usize,
@@ -158,7 +105,7 @@ pub async fn spawn_scylla_insert_workers(
ttls: Ttls,
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
let item_inp = if use_rate_limit_queue {
rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone())
crate::ratelimit::rate_limiter(insert_worker_opts.store_workers_rate.clone(), item_inp)
} else {
item_inp
};
@@ -170,6 +117,7 @@ pub async fn spawn_scylla_insert_workers(
}
for worker_ix in 0..insert_worker_count {
let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone();
#[cfg(DISABLED)]
let jh = tokio::spawn(worker(
worker_ix,
item_inp.clone(),
@@ -178,20 +126,21 @@ pub async fn spawn_scylla_insert_workers(
data_store,
store_stats.clone(),
));
// let jh = tokio::spawn(worker_streamed(
// worker_ix,
// insert_worker_count * 3,
// item_inp.clone(),
// ttls.clone(),
// insert_worker_opts.clone(),
// data_store,
// store_stats.clone(),
// ));
let jh = tokio::spawn(worker_streamed(
worker_ix,
insert_worker_count * 3,
item_inp.clone(),
ttls.clone(),
insert_worker_opts.clone(),
data_store,
store_stats.clone(),
));
jhs.push(jh);
}
Ok(jhs)
}
#[allow(unused)]
async fn worker(
worker_ix: usize,
item_inp: Receiver<QueryItem>,
@@ -215,30 +164,26 @@ async fn worker(
break;
};
match item {
QueryItem::ConnectionStatus(item) => {
match insert_connection_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.inserted_connection_status().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
QueryItem::ConnectionStatus(item) => match insert_connection_status(item, ttls.index, &data_store).await {
Ok(_) => {
stats.inserted_connection_status().inc();
backoff = backoff_0;
}
}
QueryItem::ChannelStatus(item) => {
match insert_channel_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.inserted_channel_status().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
},
QueryItem::ChannelStatus(item) => match insert_channel_status(item, ttls.index, &data_store).await {
Ok(_) => {
stats.inserted_channel_status().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
},
QueryItem::Insert(item) => {
let item_ts_local = item.ts_local.clone();
let tsnow = {
@@ -420,73 +365,49 @@ async fn worker_streamed(
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
let mut stream = item_inp
.map(|item| match item {
QueryItem::Insert(item) => {
stats.item_recv.inc();
// let mut futs: smallvec::SmallVec<
// [Pin<Box<dyn Future<Output = Result<(), crate::iteminsertqueue::Error>> + Send>>; 4],
// > = smallvec::smallvec![];
let mut futs: Vec<Pin<Box<dyn Future<Output = Result<(), crate::iteminsertqueue::Error>> + Send>>> =
Vec::new();
if item.msp_bump {
stats.inserts_msp().inc();
let fut = insert_msp_fut(
item.series.clone(),
item.ts_msp,
&ttls,
&data_store.scy,
&data_store.qu_insert_ts_msp,
);
// futs.push(Box::pin(fut));
.map(|item| {
stats.item_recv.inc();
match item {
QueryItem::Insert(item) => prepare_query_insert_futs(item, &ttls, &data_store, &stats),
QueryItem::ConnectionStatus(item) => {
stats.inserted_connection_status().inc();
let fut = insert_connection_status_fut(item, &ttls, &data_store);
smallvec![fut]
}
#[cfg(DISABLED)]
if let Some(ts_msp_grid) = item.ts_msp_grid {
let params = (
(item.series.id() as i32) & 0xff,
ts_msp_grid as i32,
if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32,
item.scalar_type.to_scylla_i32(),
item.series.id() as i64,
ttls.index.as_secs() as i32,
);
data_store
.scy
.execute(&data_store.qu_insert_series_by_ts_msp, params)
.await?;
stats.inserts_msp_grid().inc();
QueryItem::ChannelStatus(item) => {
stats.inserted_channel_status().inc();
insert_channel_status_fut(item, &ttls, &data_store)
}
_ => {
// TODO
SmallVec::new()
}
let do_insert = true;
// TODO prepare db future and pass-through.
stats.inserts_value().inc();
let fut = insert_item_fut(item, &ttls, &data_store, do_insert);
// .map_err(|e| Error::with_msg_no_trace(e.to_string()))
let fut = tokio::task::unconstrained(fut);
// futs.push(Box::pin(fut));
futs
}
_ => {
// TODO
Vec::new()
}
})
.map(|mut x| async move { x.pop().unwrap().await })
// .map(|x| futures_util::stream::iter(x))
// .flatten_unordered(None)
.buffer_unordered(concurrency)
.map(|x| x);
.map(|x| futures_util::stream::iter(x))
.flatten_unordered(Some(1))
.buffer_unordered(concurrency);
while let Some(item) = stream.next().await {
match item {
Ok(()) => {
Ok(_) => {
stats.inserted_values().inc();
// TODO compute the insert latency bin and count.
}
Err(e) => {
use scylla::transport::errors::QueryError;
let e = match e {
QueryError::TimeoutError => crate::iteminsertqueue::Error::DbTimeout,
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
scylla::transport::errors::DbError::Overloaded => crate::iteminsertqueue::Error::DbOverload,
_ => e.into(),
},
_ => e.into(),
};
stats_inc_for_err(&stats, &e);
}
}
}
stats.worker_finish().inc();
insert_worker_opts
.insert_workers_running
@@ -494,3 +415,46 @@ async fn worker_streamed(
trace2!("insert worker {worker_ix} done");
Ok(())
}
fn prepare_query_insert_futs(
item: InsertItem,
ttls: &Ttls,
data_store: &Arc<DataStore>,
stats: &InsertWorkerStats,
) -> SmallVec<[InsertFut; 4]> {
stats.inserts_value().inc();
let msp_bump = item.msp_bump;
let series = item.series.clone();
let ts_msp = item.ts_msp;
let do_insert = true;
let fut = insert_item_fut(item, &ttls, &data_store, do_insert);
let mut futs = smallvec![fut];
if msp_bump {
stats.inserts_msp().inc();
let fut = insert_msp_fut(
series,
ts_msp,
ttls,
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
);
futs.push(fut);
}
#[cfg(DISABLED)]
if let Some(ts_msp_grid) = item.ts_msp_grid {
let params = (
(item.series.id() as i32) & 0xff,
ts_msp_grid as i32,
if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32,
item.scalar_type.to_scylla_i32(),
item.series.id() as i64,
ttls.index.as_secs() as i32,
);
data_store
.scy
.execute(&data_store.qu_insert_series_by_ts_msp, params)
.await?;
stats.inserts_msp_grid().inc();
}
futs
}

View File

@@ -7,21 +7,26 @@ use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::TryFutureExt;
use netpod::timeunits::SEC;
use netpod::ScalarType;
use netpod::Shape;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use series::SeriesId;
use smallvec::smallvec;
use smallvec::SmallVec;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
#[derive(Debug, ThisError)]
pub enum Error {
@@ -254,26 +259,9 @@ struct InsParCom {
do_insert: bool,
}
fn insert_scalar_gen_fut<ST>(
par: InsParCom,
val: ST,
qu: &Arc<PreparedStatement>,
scy: &Arc<ScySession>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
fn insert_scalar_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
where
ST: scylla::frame::value::Value + Send + 'static,
{
Box::pin(insert_scalar_gen_fut_inner(par, val, qu.clone(), scy.clone()))
}
async fn insert_scalar_gen_fut_inner<ST>(
par: InsParCom,
val: ST,
qu: Arc<PreparedStatement>,
scy: Arc<ScySession>,
) -> Result<(), Error>
where
ST: scylla::frame::value::Value,
{
let params = (
par.series as i64,
@@ -283,42 +271,52 @@ where
val,
par.ttl as i32,
);
scy.execute(&qu, params)
.map(|item| {
match item {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
})
.await
InsertFut::new(scy, qu, params)
}
#[pin_project::pin_project]
pub struct InsertFut<F> {
fn insert_array_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
where
ST: scylla::frame::value::Value + Send + 'static,
{
let params = (
par.series as i64,
par.ts_msp as i64,
par.ts_lsp as i64,
par.pulse as i64,
val,
par.ttl as i32,
);
InsertFut::new(scy, qu, params)
}
pub struct InsertFut {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
qu: Arc<PreparedStatement>,
fut: F,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>,
}
impl<F> InsertFut<F> {
pub fn new(scy: Arc<ScySession>, qu: Arc<PreparedStatement>, fut: F) -> Self {
impl InsertFut {
pub fn new<V: scylla::frame::value::ValueList + Send + 'static>(
scy: Arc<ScySession>,
qu: Arc<PreparedStatement>,
params: V,
) -> Self {
let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() };
let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() };
let fut = scy_ref.execute_paged(qu_ref, params, None);
let fut = taskrun::tokio::task::unconstrained(fut);
let fut = Box::pin(fut);
Self { scy, qu, fut }
}
}
impl<F> Future for InsertFut<F> {
type Output = Result<(), Error>;
impl Future for InsertFut {
type Output = Result<QueryResult, QueryError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
todo!()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.fut.poll_unpin(cx)
}
}
@@ -468,47 +466,19 @@ pub async fn insert_item(
Ok(())
}
pub async fn insert_msp_fut(
pub fn insert_msp_fut(
series: SeriesId,
ts_msp: u64,
ttls: &Ttls,
scy: &ScySession,
qu: &PreparedStatement,
) -> Result<(), Error> {
scy: Arc<ScySession>,
qu: Arc<PreparedStatement>,
) -> InsertFut {
let params = (series.id() as i64, ts_msp as i64, ttls.index.as_secs() as i32);
scy.execute(qu, params)
.map(|item| {
match item {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
})
.await
InsertFut::new(scy, qu, params)
}
pub fn insert_item_fut(
item: InsertItem,
ttls: &Ttls,
data_store: &DataStore,
do_insert: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
let par = InsParCom {
series: item.series.id(),
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttls.d0.as_secs() as _,
do_insert,
};
let scy = &data_store.scy;
pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do_insert: bool) -> InsertFut {
let scy = data_store.scy.clone();
use DataValue::*;
match item.val {
Scalar(val) => {
@@ -522,14 +492,14 @@ pub fn insert_item_fut(
};
use ScalarValue::*;
match val {
I8(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i8, scy),
I16(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy),
Enum(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy),
I32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i32, scy),
F32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f32, scy),
F64(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f64, scy),
String(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_string, scy),
Bool(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_bool, scy),
I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy),
I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy),
F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy),
F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy),
String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy),
Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy),
}
}
Array(val) => {
@@ -543,33 +513,87 @@ pub fn insert_item_fut(
};
use ArrayValue::*;
match val {
// I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?,
// I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?,
// I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?,
// F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?,
// F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?,
// Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?,
_ => Box::pin(futures_util::future::ready(Ok(()))),
I8(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i8.clone(), scy),
I16(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i16.clone(), scy),
I32(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i32.clone(), scy),
F32(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_f32.clone(), scy),
F64(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_f64.clone(), scy),
Bool(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_bool.clone(), scy),
}
}
}
// let val: i32 = 4242;
// insert_scalar_gen_fut(
// par,
// val,
// data_store.qu_insert_scalar_i32.clone(),
// data_store.scy.clone(),
// )
}
pub fn insert_connection_status_fut(item: ConnectionStatusItem, ttls: &Ttls, data_store: &DataStore) -> InsertFut {
let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * SEC;
let nanos = tsunix.subsec_nanos() as u64;
let ts = secs + nanos;
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
let ts_lsp = ts - ts_msp;
let kind = item.status.to_kind();
let addr = format!("{}", item.addr);
let params = (
ts_msp as i64,
ts_lsp as i64,
kind as i32,
addr,
ttls.index.as_secs() as i32,
);
InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_connection_status.clone(),
params,
)
}
pub fn insert_channel_status_fut(
item: ChannelStatusItem,
ttls: &Ttls,
data_store: &DataStore,
) -> SmallVec<[InsertFut; 4]> {
let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * SEC;
let nanos = tsunix.subsec_nanos() as u64;
let ts = secs + nanos;
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
let ts_lsp = ts - ts_msp;
let kind = item.status.to_kind();
let series = item.series.id();
let params = (
series as i64,
ts_msp as i64,
ts_lsp as i64,
kind as i32,
ttls.index.as_secs() as i32,
);
let fut1 = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_channel_status.clone(),
params,
);
let params = (
ts_msp as i64,
ts_lsp as i64,
series as i64,
kind as i32,
ttls.index.as_secs() as i32,
);
let fut2 = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_channel_status_by_ts_msp.clone(),
params,
);
smallvec![fut1, fut2]
}
pub async fn insert_connection_status(
item: ConnectionStatusItem,
ttl: Duration,
data_store: &DataStore,
_stats: &InsertWorkerStats,
) -> Result<(), Error> {
let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * netpod::timeunits::SEC;
let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * SEC;
let nanos = tsunix.subsec_nanos() as u64;
let ts = secs + nanos;
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
@@ -588,10 +612,9 @@ pub async fn insert_channel_status(
item: ChannelStatusItem,
ttl: Duration,
data_store: &DataStore,
_stats: &InsertWorkerStats,
) -> Result<(), Error> {
let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * netpod::timeunits::SEC;
let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
let secs = tsunix.as_secs() * SEC;
let nanos = tsunix.subsec_nanos() as u64;
let ts = secs + nanos;
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;

View File

@@ -8,6 +8,7 @@ pub mod futinsert;
pub mod futinsertloop;
pub mod insertworker;
pub mod iteminsertqueue;
pub mod ratelimit;
pub mod schema;
pub mod session;
pub mod store;

68
scywr/src/ratelimit.rs Normal file
View File

@@ -0,0 +1,68 @@
use async_channel::Receiver;
use async_channel::Sender;
use log::*;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
// TODO allow a trait to tell worker whether drop is allowed or not.
async fn rate_limiter_worker<T>(
rate: Arc<AtomicU64>,
inp: Receiver<T>,
tx: Sender<T>,
// stats: Arc<stats::InsertWorkerStats>,
) {
let mut ts_forward_last = Instant::now();
let mut ivl_ema = stats::Ema64::with_k(0.00001);
loop {
let item = if let Ok(x) = inp.recv().await {
x
} else {
break;
};
let ts_received = Instant::now();
let allowed_to_drop = false;
let dt_min = {
let rate2 = rate.load(Ordering::Acquire);
Duration::from_nanos(SEC / rate2)
};
let mut ema2 = ivl_ema.clone();
{
let dt = ts_received.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ema2.update(dt_ns.min(MS * 100) as f32);
}
let ivl2 = Duration::from_nanos(ema2.ema() as u64);
if allowed_to_drop && ivl2 < dt_min {
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
// stats.ratelimit_drop().inc();
} else {
if tx.send(item).await.is_err() {
break;
} else {
let tsnow = Instant::now();
let dt = tsnow.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ivl_ema.update(dt_ns.min(MS * 100) as f32);
ts_forward_last = tsnow;
// stats.inter_ivl_ema.set(ivl_ema.ema() as u64);
}
}
}
info!("rate limiter done");
}
pub fn rate_limiter<T: Send + 'static>(
rate: Arc<AtomicU64>,
inp: Receiver<T>,
// stats: Arc<stats::InsertWorkerStats>,
) -> Receiver<T> {
let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256));
taskrun::spawn(rate_limiter_worker(rate, inp, tx));
rx
}

View File

@@ -209,6 +209,11 @@ impl IntervalEma {
// #[cfg(DISABLED)]
stats_proc::stats_struct!((
stats_struct(
name(CaProtoStats),
prefix(ca_proto),
counters(tcp_recv_count, tcp_recv_bytes,),
),
stats_struct(
name(CaConnSetStats),
counters(