From 5fa77acf5cbf47b2036cd27f7ab584f6e0ba6f25 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 19 Sep 2023 22:27:22 +0200 Subject: [PATCH] Alternative insert queue worker --- daqingest/src/daemon.rs | 1 + netfetch/src/ca/conn.rs | 12 +- netfetch/src/ca/connset.rs | 15 ++- netfetch/src/ca/proto.rs | 14 +- netfetch/src/metrics.rs | 11 +- scywr/src/insertworker.rs | 252 +++++++++++++++-------------------- scywr/src/iteminsertqueue.rs | 237 +++++++++++++++++--------------- scywr/src/lib.rs | 1 + scywr/src/ratelimit.rs | 68 ++++++++++ stats/src/stats.rs | 5 + 10 files changed, 353 insertions(+), 263 deletions(-) create mode 100644 scywr/src/ratelimit.rs diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 23344bd..bb45de7 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -619,6 +619,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> daemon_stats, conn_set_stats, ca_conn_stats, + daemon.connset_ctrl.ca_proto_stats().clone(), daemon.insert_worker_stats.clone(), daemon.series_by_channel_stats.clone(), insert_frac, diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index d7232c1..f1d1c85 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -40,6 +40,7 @@ use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; use stats::CaConnStats; +use stats::CaProtoStats; use stats::IntervalEma; use std::collections::BTreeMap; use std::collections::VecDeque; @@ -502,8 +503,10 @@ pub struct CaConn { channel_info_query_sending: SenderPolling, time_binners: BTreeMap, thr_msg_poll: ThrottleTrace, + ca_proto_stats: Arc, } +#[cfg(DISABLED)] impl Drop for CaConn { fn drop(&mut self) { debug!("~~~~~~~~~~~~~~~ Drop CaConn {}", self.remote_addr_dbg); @@ -519,6 +522,7 @@ impl CaConn { storage_insert_tx: Sender, channel_info_query_tx: Sender, stats: Arc, + ca_proto_stats: Arc, ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { @@ -554,6 +558,7 @@ impl CaConn { channel_info_query_sending: SenderPolling::new(channel_info_query_tx), time_binners: BTreeMap::new(), thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), + ca_proto_stats, } } @@ -1614,7 +1619,12 @@ impl CaConn { status: ConnectionStatus::Established, })); self.backoff_reset(); - let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.opts.array_truncate); + let proto = CaProto::new( + tcp, + self.remote_addr_dbg.clone(), + self.opts.array_truncate, + self.ca_proto_stats.clone(), + ); self.state = CaConnState::Init; self.proto = Some(proto); Ok(Ready(Some(()))) diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 9973191..08c6c45 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -47,6 +47,7 @@ use statemap::WithStatusSeriesIdStateInner; use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use stats::CaConnSetStats; use stats::CaConnStats; +use stats::CaProtoStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddr; @@ -214,6 +215,7 @@ pub struct CaConnSetCtrl { rx: Receiver, stats: Arc, ca_conn_stats: Arc, + ca_proto_stats: Arc, jh: JoinHandle>, } @@ -268,6 +270,10 @@ impl CaConnSetCtrl { pub fn ca_conn_stats(&self) -> &Arc { &self.ca_conn_stats } + + pub fn ca_proto_stats(&self) -> &Arc { + &self.ca_proto_stats + } } #[derive(Debug)] @@ -323,6 +329,7 @@ pub struct CaConnSet { thr_msg_poll_1: ThrottleTrace, thr_msg_storage_len: ThrottleTrace, did_connset_out_queue: bool, + ca_proto_stats: Arc, } impl CaConnSet { @@ -341,10 +348,8 @@ impl CaConnSet { super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf); let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400); let stats = Arc::new(CaConnSetStats::new()); + let ca_proto_stats = Arc::new(CaProtoStats::new()); let ca_conn_stats = Arc::new(CaConnStats::new()); - stats.test_1().inc(); - stats.test_1().inc(); - stats.test_1().inc(); let connset = Self { backend, local_epics_hostname, @@ -378,6 +383,7 @@ impl CaConnSet { thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)), thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), did_connset_out_queue: false, + ca_proto_stats: ca_proto_stats.clone(), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -386,6 +392,7 @@ impl CaConnSet { rx: connset_out_rx, stats, ca_conn_stats, + ca_proto_stats, jh, } } @@ -766,6 +773,7 @@ impl CaConnSet { self.storage_insert_tx.clone(), self.channel_info_query_tx.clone(), self.ca_conn_stats.clone(), + self.ca_proto_stats.clone(), ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); @@ -817,6 +825,7 @@ impl CaConnSet { Err(e) => { error!("CaConn gives error: {e:?}"); ret = Err(e); + break; } } } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index cceb5c8..54c26e4 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -6,6 +6,7 @@ use futures_util::Stream; use log::*; use netpod::timeunits::*; use slidebuf::SlideBuf; +use stats::CaProtoStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::io; @@ -13,6 +14,7 @@ use std::net::SocketAddrV4; use std::num::NonZeroU16; use std::num::NonZeroU64; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use taskrun::tokio; @@ -948,19 +950,21 @@ pub struct CaProto { out: VecDeque, array_truncate: usize, logged_proto_error_for_cid: BTreeMap, + stats: Arc, } impl CaProto { - pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize) -> Self { + pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize, stats: Arc) -> Self { Self { tcp, remote_addr_dbg, state: CaState::StdHead, - buf: SlideBuf::new(1024 * 128), + buf: SlideBuf::new(1024 * 512), outbuf: SlideBuf::new(1024 * 128), out: VecDeque::new(), array_truncate, logged_proto_error_for_cid: BTreeMap::new(), + stats, } } @@ -1083,7 +1087,11 @@ impl CaProto { info!("received data {:?}", &rbuf.filled()[0..t]); } match self.buf.wadv(nf) { - Ok(()) => Ok(Some(Ready(CaItem::empty()))), + Ok(()) => { + self.stats.tcp_recv_bytes().add(nf as _); + self.stats.tcp_recv_count().inc(); + Ok(Some(Ready(CaItem::empty()))) + } Err(e) => { error!("netbuf wadv fail nf {nf}"); Err(e.into()) diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 98315ce..b050561 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -18,6 +18,7 @@ use stats::CaConnSetStats; use stats::CaConnStats; use stats::CaConnStatsAgg; use stats::CaConnStatsAggDiff; +use stats::CaProtoStats; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; @@ -33,6 +34,7 @@ pub struct StatsSet { daemon: Arc, ca_conn_set: Arc, ca_conn: Arc, + ca_proto: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, insert_frac: Arc, @@ -43,6 +45,7 @@ impl StatsSet { daemon: Arc, ca_conn_set: Arc, ca_conn: Arc, + ca_proto: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, insert_frac: Arc, @@ -51,6 +54,7 @@ impl StatsSet { daemon, ca_conn_set, ca_conn, + ca_proto, insert_worker_stats, series_by_channel_stats, insert_frac, @@ -215,11 +219,8 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st let s3 = stats_set.insert_worker_stats.prometheus(); let s4 = stats_set.ca_conn.prometheus(); let s5 = stats_set.series_by_channel_stats.prometheus(); - s1.push_str(&s2); - s1.push_str(&s3); - s1.push_str(&s4); - s1.push_str(&s5); - s1 + let s6 = stats_set.ca_proto.prometheus(); + [s1, s2, s3, s4, s5, s6].join("") } }), ) diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index e62adce..fbc8eba 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,9 +1,13 @@ use crate::iteminsertqueue::insert_channel_status; +use crate::iteminsertqueue::insert_channel_status_fut; use crate::iteminsertqueue::insert_connection_status; +use crate::iteminsertqueue::insert_connection_status_fut; use crate::iteminsertqueue::insert_item; use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; +use crate::iteminsertqueue::ConnectionStatusItem; use crate::iteminsertqueue::InsertFut; +use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::QueryItem; use crate::store::DataStore; use async_channel::Receiver; @@ -15,6 +19,8 @@ use log::*; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::ScyllaConfig; +use smallvec::smallvec; +use smallvec::SmallVec; use stats::InsertWorkerStats; use std::pin::Pin; use std::sync::atomic; @@ -88,65 +94,6 @@ pub struct InsertWorkerOpts { pub array_truncate: Arc, } -async fn rate_limiter_worker( - rate: Arc, - inp: Receiver, - tx: Sender, - stats: Arc, -) { - let mut ts_forward_last = Instant::now(); - let mut ivl_ema = stats::Ema64::with_k(0.00001); - loop { - let item = if let Ok(x) = inp.recv().await { - x - } else { - break; - }; - let ts_received = Instant::now(); - let allowed_to_drop = match &item { - QueryItem::Insert(_) => true, - _ => false, - }; - let dt_min = { - let rate2 = rate.load(Ordering::Acquire); - Duration::from_nanos(SEC / rate2) - }; - let mut ema2 = ivl_ema.clone(); - { - let dt = ts_received.duration_since(ts_forward_last); - let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; - ema2.update(dt_ns.min(MS * 100) as f32); - } - let ivl2 = Duration::from_nanos(ema2.ema() as u64); - if allowed_to_drop && ivl2 < dt_min { - //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; - stats.ratelimit_drop().inc(); - } else { - if tx.send(item).await.is_err() { - break; - } else { - let tsnow = Instant::now(); - let dt = tsnow.duration_since(ts_forward_last); - let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; - ivl_ema.update(dt_ns.min(MS * 100) as f32); - ts_forward_last = tsnow; - // stats.inter_ivl_ema.set(ivl_ema.ema() as u64); - } - } - } - info!("rate limiter done"); -} - -fn rate_limiter( - inp: Receiver, - opts: Arc, - stats: Arc, -) -> Receiver { - let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256)); - tokio::spawn(rate_limiter_worker(opts.store_workers_rate.clone(), inp, tx, stats)); - rx -} - pub async fn spawn_scylla_insert_workers( scyconf: ScyllaConfig, insert_scylla_sessions: usize, @@ -158,7 +105,7 @@ pub async fn spawn_scylla_insert_workers( ttls: Ttls, ) -> Result>>, Error> { let item_inp = if use_rate_limit_queue { - rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone()) + crate::ratelimit::rate_limiter(insert_worker_opts.store_workers_rate.clone(), item_inp) } else { item_inp }; @@ -170,6 +117,7 @@ pub async fn spawn_scylla_insert_workers( } for worker_ix in 0..insert_worker_count { let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); + #[cfg(DISABLED)] let jh = tokio::spawn(worker( worker_ix, item_inp.clone(), @@ -178,20 +126,21 @@ pub async fn spawn_scylla_insert_workers( data_store, store_stats.clone(), )); - // let jh = tokio::spawn(worker_streamed( - // worker_ix, - // insert_worker_count * 3, - // item_inp.clone(), - // ttls.clone(), - // insert_worker_opts.clone(), - // data_store, - // store_stats.clone(), - // )); + let jh = tokio::spawn(worker_streamed( + worker_ix, + insert_worker_count * 3, + item_inp.clone(), + ttls.clone(), + insert_worker_opts.clone(), + data_store, + store_stats.clone(), + )); jhs.push(jh); } Ok(jhs) } +#[allow(unused)] async fn worker( worker_ix: usize, item_inp: Receiver, @@ -215,30 +164,26 @@ async fn worker( break; }; match item { - QueryItem::ConnectionStatus(item) => { - match insert_connection_status(item, ttls.index, &data_store, &stats).await { - Ok(_) => { - stats.inserted_connection_status().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } + QueryItem::ConnectionStatus(item) => match insert_connection_status(item, ttls.index, &data_store).await { + Ok(_) => { + stats.inserted_connection_status().inc(); + backoff = backoff_0; } - } - QueryItem::ChannelStatus(item) => { - match insert_channel_status(item, ttls.index, &data_store, &stats).await { - Ok(_) => { - stats.inserted_channel_status().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } - } + }, + QueryItem::ChannelStatus(item) => match insert_channel_status(item, ttls.index, &data_store).await { + Ok(_) => { + stats.inserted_channel_status().inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + }, QueryItem::Insert(item) => { let item_ts_local = item.ts_local.clone(); let tsnow = { @@ -420,73 +365,49 @@ async fn worker_streamed( .insert_workers_running .fetch_add(1, atomic::Ordering::AcqRel); let mut stream = item_inp - .map(|item| match item { - QueryItem::Insert(item) => { - stats.item_recv.inc(); - // let mut futs: smallvec::SmallVec< - // [Pin> + Send>>; 4], - // > = smallvec::smallvec![]; - let mut futs: Vec> + Send>>> = - Vec::new(); - if item.msp_bump { - stats.inserts_msp().inc(); - let fut = insert_msp_fut( - item.series.clone(), - item.ts_msp, - &ttls, - &data_store.scy, - &data_store.qu_insert_ts_msp, - ); - // futs.push(Box::pin(fut)); + .map(|item| { + stats.item_recv.inc(); + match item { + QueryItem::Insert(item) => prepare_query_insert_futs(item, &ttls, &data_store, &stats), + QueryItem::ConnectionStatus(item) => { + stats.inserted_connection_status().inc(); + let fut = insert_connection_status_fut(item, &ttls, &data_store); + smallvec![fut] } - #[cfg(DISABLED)] - if let Some(ts_msp_grid) = item.ts_msp_grid { - let params = ( - (item.series.id() as i32) & 0xff, - ts_msp_grid as i32, - if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, - item.scalar_type.to_scylla_i32(), - item.series.id() as i64, - ttls.index.as_secs() as i32, - ); - data_store - .scy - .execute(&data_store.qu_insert_series_by_ts_msp, params) - .await?; - stats.inserts_msp_grid().inc(); + QueryItem::ChannelStatus(item) => { + stats.inserted_channel_status().inc(); + insert_channel_status_fut(item, &ttls, &data_store) + } + _ => { + // TODO + SmallVec::new() } - let do_insert = true; - // TODO prepare db future and pass-through. - stats.inserts_value().inc(); - let fut = insert_item_fut(item, &ttls, &data_store, do_insert); - // .map_err(|e| Error::with_msg_no_trace(e.to_string())) - let fut = tokio::task::unconstrained(fut); - // futs.push(Box::pin(fut)); - futs - } - _ => { - // TODO - Vec::new() } }) - .map(|mut x| async move { x.pop().unwrap().await }) - // .map(|x| futures_util::stream::iter(x)) - // .flatten_unordered(None) - .buffer_unordered(concurrency) - .map(|x| x); - + .map(|x| futures_util::stream::iter(x)) + .flatten_unordered(Some(1)) + .buffer_unordered(concurrency); while let Some(item) = stream.next().await { match item { - Ok(()) => { + Ok(_) => { stats.inserted_values().inc(); // TODO compute the insert latency bin and count. } Err(e) => { + use scylla::transport::errors::QueryError; + let e = match e { + QueryError::TimeoutError => crate::iteminsertqueue::Error::DbTimeout, + // TODO use `msg` + QueryError::DbError(e, _msg) => match e { + scylla::transport::errors::DbError::Overloaded => crate::iteminsertqueue::Error::DbOverload, + _ => e.into(), + }, + _ => e.into(), + }; stats_inc_for_err(&stats, &e); } } } - stats.worker_finish().inc(); insert_worker_opts .insert_workers_running @@ -494,3 +415,46 @@ async fn worker_streamed( trace2!("insert worker {worker_ix} done"); Ok(()) } + +fn prepare_query_insert_futs( + item: InsertItem, + ttls: &Ttls, + data_store: &Arc, + stats: &InsertWorkerStats, +) -> SmallVec<[InsertFut; 4]> { + stats.inserts_value().inc(); + let msp_bump = item.msp_bump; + let series = item.series.clone(); + let ts_msp = item.ts_msp; + let do_insert = true; + let fut = insert_item_fut(item, &ttls, &data_store, do_insert); + let mut futs = smallvec![fut]; + if msp_bump { + stats.inserts_msp().inc(); + let fut = insert_msp_fut( + series, + ts_msp, + ttls, + data_store.scy.clone(), + data_store.qu_insert_ts_msp.clone(), + ); + futs.push(fut); + } + #[cfg(DISABLED)] + if let Some(ts_msp_grid) = item.ts_msp_grid { + let params = ( + (item.series.id() as i32) & 0xff, + ts_msp_grid as i32, + if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, + item.scalar_type.to_scylla_i32(), + item.series.id() as i64, + ttls.index.as_secs() as i32, + ); + data_store + .scy + .execute(&data_store.qu_insert_series_by_ts_msp, params) + .await?; + stats.inserts_msp_grid().inc(); + } + futs +} diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 4a7357c..d72c152 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -7,21 +7,26 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::FutureExt; -use futures_util::TryFutureExt; +use netpod::timeunits::SEC; use netpod::ScalarType; use netpod::Shape; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; +use scylla::QueryResult; use series::SeriesId; +use smallvec::smallvec; +use smallvec::SmallVec; use stats::InsertWorkerStats; use std::net::SocketAddrV4; use std::pin::Pin; +use std::ptr::NonNull; use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; use std::time::SystemTime; +use std::time::UNIX_EPOCH; #[derive(Debug, ThisError)] pub enum Error { @@ -254,26 +259,9 @@ struct InsParCom { do_insert: bool, } -fn insert_scalar_gen_fut( - par: InsParCom, - val: ST, - qu: &Arc, - scy: &Arc, -) -> Pin> + Send>> +fn insert_scalar_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut where ST: scylla::frame::value::Value + Send + 'static, -{ - Box::pin(insert_scalar_gen_fut_inner(par, val, qu.clone(), scy.clone())) -} - -async fn insert_scalar_gen_fut_inner( - par: InsParCom, - val: ST, - qu: Arc, - scy: Arc, -) -> Result<(), Error> -where - ST: scylla::frame::value::Value, { let params = ( par.series as i64, @@ -283,42 +271,52 @@ where val, par.ttl as i32, ); - scy.execute(&qu, params) - .map(|item| { - match item { - Ok(_) => Ok(()), - Err(e) => match e { - QueryError::TimeoutError => Err(Error::DbTimeout), - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - DbError::Overloaded => Err(Error::DbOverload), - _ => Err(e.into()), - }, - _ => Err(e.into()), - }, - } - }) - .await + InsertFut::new(scy, qu, params) } -#[pin_project::pin_project] -pub struct InsertFut { +fn insert_array_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut +where + ST: scylla::frame::value::Value + Send + 'static, +{ + let params = ( + par.series as i64, + par.ts_msp as i64, + par.ts_lsp as i64, + par.pulse as i64, + val, + par.ttl as i32, + ); + InsertFut::new(scy, qu, params) +} + +pub struct InsertFut { + #[allow(unused)] scy: Arc, + #[allow(unused)] qu: Arc, - fut: F, + fut: Pin> + Send>>, } -impl InsertFut { - pub fn new(scy: Arc, qu: Arc, fut: F) -> Self { +impl InsertFut { + pub fn new( + scy: Arc, + qu: Arc, + params: V, + ) -> Self { + let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() }; + let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() }; + let fut = scy_ref.execute_paged(qu_ref, params, None); + let fut = taskrun::tokio::task::unconstrained(fut); + let fut = Box::pin(fut); Self { scy, qu, fut } } } -impl Future for InsertFut { - type Output = Result<(), Error>; +impl Future for InsertFut { + type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - todo!() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.fut.poll_unpin(cx) } } @@ -468,47 +466,19 @@ pub async fn insert_item( Ok(()) } -pub async fn insert_msp_fut( +pub fn insert_msp_fut( series: SeriesId, ts_msp: u64, ttls: &Ttls, - scy: &ScySession, - qu: &PreparedStatement, -) -> Result<(), Error> { + scy: Arc, + qu: Arc, +) -> InsertFut { let params = (series.id() as i64, ts_msp as i64, ttls.index.as_secs() as i32); - scy.execute(qu, params) - .map(|item| { - match item { - Ok(_) => Ok(()), - Err(e) => match e { - QueryError::TimeoutError => Err(Error::DbTimeout), - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - DbError::Overloaded => Err(Error::DbOverload), - _ => Err(e.into()), - }, - _ => Err(e.into()), - }, - } - }) - .await + InsertFut::new(scy, qu, params) } -pub fn insert_item_fut( - item: InsertItem, - ttls: &Ttls, - data_store: &DataStore, - do_insert: bool, -) -> Pin> + Send>> { - let par = InsParCom { - series: item.series.id(), - ts_msp: item.ts_msp, - ts_lsp: item.ts_lsp, - pulse: item.pulse, - ttl: ttls.d0.as_secs() as _, - do_insert, - }; - let scy = &data_store.scy; +pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do_insert: bool) -> InsertFut { + let scy = data_store.scy.clone(); use DataValue::*; match item.val { Scalar(val) => { @@ -522,14 +492,14 @@ pub fn insert_item_fut( }; use ScalarValue::*; match val { - I8(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i8, scy), - I16(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy), - Enum(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i16, scy), - I32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_i32, scy), - F32(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f32, scy), - F64(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_f64, scy), - String(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_string, scy), - Bool(val) => insert_scalar_gen_fut(par, val, &data_store.qu_insert_scalar_bool, scy), + I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy), + I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), + Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), + I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy), + F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy), + F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy), + String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy), + Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy), } } Array(val) => { @@ -543,33 +513,87 @@ pub fn insert_item_fut( }; use ArrayValue::*; match val { - // I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?, - // I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?, - // I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?, - // F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?, - // F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?, - // Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?, - _ => Box::pin(futures_util::future::ready(Ok(()))), + I8(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i8.clone(), scy), + I16(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i16.clone(), scy), + I32(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i32.clone(), scy), + F32(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_f32.clone(), scy), + F64(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_f64.clone(), scy), + Bool(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_bool.clone(), scy), } } } - // let val: i32 = 4242; - // insert_scalar_gen_fut( - // par, - // val, - // data_store.qu_insert_scalar_i32.clone(), - // data_store.scy.clone(), - // ) +} + +pub fn insert_connection_status_fut(item: ConnectionStatusItem, ttls: &Ttls, data_store: &DataStore) -> InsertFut { + let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); + let secs = tsunix.as_secs() * SEC; + let nanos = tsunix.subsec_nanos() as u64; + let ts = secs + nanos; + let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; + let ts_lsp = ts - ts_msp; + let kind = item.status.to_kind(); + let addr = format!("{}", item.addr); + let params = ( + ts_msp as i64, + ts_lsp as i64, + kind as i32, + addr, + ttls.index.as_secs() as i32, + ); + InsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_connection_status.clone(), + params, + ) +} + +pub fn insert_channel_status_fut( + item: ChannelStatusItem, + ttls: &Ttls, + data_store: &DataStore, +) -> SmallVec<[InsertFut; 4]> { + let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); + let secs = tsunix.as_secs() * SEC; + let nanos = tsunix.subsec_nanos() as u64; + let ts = secs + nanos; + let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; + let ts_lsp = ts - ts_msp; + let kind = item.status.to_kind(); + let series = item.series.id(); + let params = ( + series as i64, + ts_msp as i64, + ts_lsp as i64, + kind as i32, + ttls.index.as_secs() as i32, + ); + let fut1 = InsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_channel_status.clone(), + params, + ); + let params = ( + ts_msp as i64, + ts_lsp as i64, + series as i64, + kind as i32, + ttls.index.as_secs() as i32, + ); + let fut2 = InsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_channel_status_by_ts_msp.clone(), + params, + ); + smallvec![fut1, fut2] } pub async fn insert_connection_status( item: ConnectionStatusItem, ttl: Duration, data_store: &DataStore, - _stats: &InsertWorkerStats, ) -> Result<(), Error> { - let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); - let secs = tsunix.as_secs() * netpod::timeunits::SEC; + let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); + let secs = tsunix.as_secs() * SEC; let nanos = tsunix.subsec_nanos() as u64; let ts = secs + nanos; let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; @@ -588,10 +612,9 @@ pub async fn insert_channel_status( item: ChannelStatusItem, ttl: Duration, data_store: &DataStore, - _stats: &InsertWorkerStats, ) -> Result<(), Error> { - let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); - let secs = tsunix.as_secs() * netpod::timeunits::SEC; + let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); + let secs = tsunix.as_secs() * SEC; let nanos = tsunix.subsec_nanos() as u64; let ts = secs + nanos; let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index bd66553..38b04f6 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -8,6 +8,7 @@ pub mod futinsert; pub mod futinsertloop; pub mod insertworker; pub mod iteminsertqueue; +pub mod ratelimit; pub mod schema; pub mod session; pub mod store; diff --git a/scywr/src/ratelimit.rs b/scywr/src/ratelimit.rs new file mode 100644 index 0000000..7425ac4 --- /dev/null +++ b/scywr/src/ratelimit.rs @@ -0,0 +1,68 @@ +use async_channel::Receiver; +use async_channel::Sender; +use log::*; +use netpod::timeunits::MS; +use netpod::timeunits::SEC; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +// TODO allow a trait to tell worker whether drop is allowed or not. + +async fn rate_limiter_worker( + rate: Arc, + inp: Receiver, + tx: Sender, + // stats: Arc, +) { + let mut ts_forward_last = Instant::now(); + let mut ivl_ema = stats::Ema64::with_k(0.00001); + loop { + let item = if let Ok(x) = inp.recv().await { + x + } else { + break; + }; + let ts_received = Instant::now(); + let allowed_to_drop = false; + let dt_min = { + let rate2 = rate.load(Ordering::Acquire); + Duration::from_nanos(SEC / rate2) + }; + let mut ema2 = ivl_ema.clone(); + { + let dt = ts_received.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ema2.update(dt_ns.min(MS * 100) as f32); + } + let ivl2 = Duration::from_nanos(ema2.ema() as u64); + if allowed_to_drop && ivl2 < dt_min { + //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; + // stats.ratelimit_drop().inc(); + } else { + if tx.send(item).await.is_err() { + break; + } else { + let tsnow = Instant::now(); + let dt = tsnow.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ivl_ema.update(dt_ns.min(MS * 100) as f32); + ts_forward_last = tsnow; + // stats.inter_ivl_ema.set(ivl_ema.ema() as u64); + } + } + } + info!("rate limiter done"); +} + +pub fn rate_limiter( + rate: Arc, + inp: Receiver, + // stats: Arc, +) -> Receiver { + let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256)); + taskrun::spawn(rate_limiter_worker(rate, inp, tx)); + rx +} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 157b54c..101d6b9 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -209,6 +209,11 @@ impl IntervalEma { // #[cfg(DISABLED)] stats_proc::stats_struct!(( + stats_struct( + name(CaProtoStats), + prefix(ca_proto), + counters(tcp_recv_count, tcp_recv_bytes,), + ), stats_struct( name(CaConnSetStats), counters(