Factor out stats collection

This commit is contained in:
Dominik Werder
2023-08-29 09:32:17 +02:00
parent e05970ef56
commit 017066d73a
7 changed files with 209 additions and 167 deletions

View File

@@ -1,3 +1,6 @@
pub mod inserthook;
pub mod types;
use async_channel::Receiver;
use async_channel::Sender;
use async_channel::WeakReceiver;
@@ -51,6 +54,7 @@ use tokio_postgres::Client as PgClient;
use tokio_postgres::Row as PgRow;
use tracing::info_span;
use tracing::Instrument;
use types::*;
const SEARCH_BATCH_MAX: usize = 256;
const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4;
@@ -86,7 +90,7 @@ static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0);
#[allow(unused)]
macro_rules! debug_batch {
(D$($arg:tt)*) => ();
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
info!($($arg)*);
});
@@ -94,27 +98,12 @@ macro_rules! debug_batch {
#[allow(unused)]
macro_rules! trace_batch {
(D$($arg:tt)*) => ();
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
trace!($($arg)*);
});
}
#[allow(non_snake_case)]
mod serde_Instant {
use serde::Serializer;
use std::time::Instant;
#[allow(unused)]
pub fn serialize<S>(val: &Instant, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let dur = val.elapsed();
ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64)
}
}
#[derive(Clone, Debug, Serialize)]
pub enum ConnectionStateValue {
Unconnected,
@@ -287,7 +276,7 @@ impl Daemon {
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let datastore = Arc::new(datastore);
let (tx, rx) = async_channel::bounded(32);
let (tx, rx_daemon_ev) = async_channel::bounded(32);
let pgcs = {
let mut a = Vec::new();
for _ in 0..SEARCH_DB_PIPELINE_LEN {
@@ -308,79 +297,12 @@ impl Daemon {
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
// let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
// Insert queue hook
if true {
tokio::spawn({
let rx = common_insert_item_queue
.receiver()
.ok_or_else(|| Error::with_msg_no_trace("can not derive receiver for insert queue adapter"))?;
let tx = common_insert_item_queue_2
.sender()
.ok_or_else(|| Error::with_msg_no_trace("can not derive sender for insert queue adapter"))?;
let insert_queue_counter = insert_queue_counter.clone();
let common_insert_item_queue_2 = common_insert_item_queue_2.clone();
async move {
let mut printed_last = Instant::now();
let mut histo = BTreeMap::new();
while let Ok(item) = rx.recv().await {
insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel);
//trace!("insert queue item {item:?}");
match &item {
QueryItem::Insert(item) => {
let shape_kind = match &item.shape {
netpod::Shape::Scalar => 0 as u32,
netpod::Shape::Wave(_) => 1,
netpod::Shape::Image(_, _) => 2,
};
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse, _shape_kind)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
// TODO should check that shape_kind stays the same.
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind));
}
_ => {}
}
match tx.send(item).await {
Ok(_) => {}
Err(e) => {
error!("insert queue hook send {e}");
break;
}
}
let tsnow = Instant::now();
if tsnow.duration_since(printed_last) >= PRINT_ACTIVE_INTERVAL {
printed_last = tsnow;
let mut all: Vec<_> = histo
.iter()
.map(|(k, (c, msp, lsp, pulse, shape_kind))| {
(usize::MAX - *c, k.clone(), *msp, *lsp, *pulse, *shape_kind)
})
.collect();
all.sort_unstable();
info!("Active scalar");
for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) {
info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid);
}
info!("Active wave");
for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) {
info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid);
}
histo.clear();
}
}
info!("insert queue adapter ended");
common_insert_item_queue_2.drop_sender();
}
});
}
let rx = inserthook::active_channel_insert_hook(common_insert_item_queue.receiver().unwrap());
let common_insert_item_queue_2 = rx;
let ingest_commons = IngestCommons {
pgconf: Arc::new(opts.pgconf.clone()),
@@ -417,8 +339,6 @@ impl Daemon {
let insert_worker_count = 1000;
let use_rate_limit_queue = false;
let insert_rx_weak = common_insert_item_queue_2.receiver().unwrap().downgrade();
// TODO use a new stats type:
let store_stats = Arc::new(stats::CaConnStats::new());
let ttls = opts.ttls.clone();
@@ -473,7 +393,7 @@ impl Daemon {
connection_states: BTreeMap::new(),
channel_states: BTreeMap::new(),
tx,
rx,
rx: rx_daemon_ev,
chan_check_next: None,
search_tx,
ioc_finder_jh,
@@ -492,7 +412,7 @@ impl Daemon {
caconn_last_channel_check: Instant::now(),
stats: Arc::new(DaemonStats::new()),
shutting_down: false,
insert_rx_weak,
insert_rx_weak: common_insert_item_queue_2.downgrade(),
channel_info_query_tx,
};
Ok(ret)

View File

@@ -0,0 +1,81 @@
use crate::daemon::PRINT_ACTIVE_INTERVAL;
use async_channel::Receiver;
use async_channel::Sender;
use log::*;
use netpod::Shape;
use scywr::iteminsertqueue::QueryItem;
use std::collections::BTreeMap;
use std::time::Instant;
use taskrun::tokio;
pub async fn active_channel_insert_hook_worker(rx: Receiver<QueryItem>, tx: Sender<QueryItem>) {
// let rx = common_insert_item_queue
// .receiver()
// .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver for insert queue adapter"))?;
// let tx = common_insert_item_queue_2
// .sender()
// .ok_or_else(|| Error::with_msg_no_trace("can not derive sender for insert queue adapter"))?;
// let insert_queue_counter = insert_queue_counter.clone();
// let common_insert_item_queue_2 = common_insert_item_queue_2.clone();
let mut printed_last = Instant::now();
let mut histo = BTreeMap::new();
while let Ok(item) = rx.recv().await {
// TODO collect stats
// insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel);
//trace!("insert queue item {item:?}");
match &item {
QueryItem::Insert(item) => {
let shape_kind = match &item.shape {
Shape::Scalar => 0 as u32,
Shape::Wave(_) => 1,
Shape::Image(_, _) => 2,
};
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse, _shape_kind)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
// TODO should check that shape_kind stays the same.
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind));
}
_ => {}
}
match tx.send(item).await {
Ok(_) => {}
Err(e) => {
error!("insert queue hook send {e}");
break;
}
}
let tsnow = Instant::now();
if tsnow.duration_since(printed_last) >= PRINT_ACTIVE_INTERVAL {
printed_last = tsnow;
let mut all: Vec<_> = histo
.iter()
.map(|(k, (c, msp, lsp, pulse, shape_kind))| {
(usize::MAX - *c, k.clone(), *msp, *lsp, *pulse, *shape_kind)
})
.collect();
all.sort_unstable();
info!("Active scalar");
for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) {
info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid);
}
info!("Active wave");
for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) {
info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid);
}
histo.clear();
}
}
info!("insert queue adapter ended");
}
pub fn active_channel_insert_hook(inp: Receiver<QueryItem>) -> Receiver<QueryItem> {
let (tx, rx) = async_channel::bounded(256);
tokio::spawn(active_channel_insert_hook_worker(inp, tx));
rx
}

View File

@@ -0,0 +1,11 @@
use async_channel::Receiver;
use netpod::Database;
use netpod::ScyllaConfig;
use scywr::insertworker::Ttls;
use serde::Serialize;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::net::SocketAddrV4;
use std::time::Instant;
use std::time::SystemTime;

View File

@@ -1,9 +1,10 @@
use crate::iteminsertqueue::insert_channel_status;
use crate::iteminsertqueue::insert_connection_status;
use crate::iteminsertqueue::insert_item;
use crate::iteminsertqueue::CommonInsertItemQueue;
use crate::iteminsertqueue::QueryItem;
use crate::store::DataStore;
use async_channel::Receiver;
use async_channel::Sender;
use err::Error;
use log::*;
use netpod::timeunits::MS;
@@ -69,77 +70,80 @@ pub struct InsertWorkerOpts {
pub insert_frac: Arc<AtomicU64>,
}
async fn rate_limiter_worker(
rate: Arc<AtomicU64>,
inp: Receiver<QueryItem>,
tx: Sender<QueryItem>,
stats: Arc<stats::CaConnStats>,
) {
let mut ts_forward_last = Instant::now();
let mut ivl_ema = stats::Ema64::with_k(0.00001);
loop {
let item = if let Ok(x) = inp.recv().await {
x
} else {
break;
};
let ts_received = Instant::now();
let allowed_to_drop = match &item {
QueryItem::Insert(_) => true,
_ => false,
};
let dt_min = {
let rate2 = rate.load(Ordering::Acquire);
Duration::from_nanos(SEC / rate2)
};
let mut ema2 = ivl_ema.clone();
{
let dt = ts_received.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ema2.update(dt_ns.min(MS * 100) as f32);
}
let ivl2 = Duration::from_nanos(ema2.ema() as u64);
if allowed_to_drop && ivl2 < dt_min {
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
stats.store_worker_ratelimit_drop_inc();
} else {
if tx.send(item).await.is_err() {
break;
} else {
let tsnow = Instant::now();
let dt = tsnow.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ivl_ema.update(dt_ns.min(MS * 100) as f32);
ts_forward_last = tsnow;
stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release);
}
}
}
info!("rate limiter done");
}
fn rate_limiter(
inp: Receiver<QueryItem>,
opts: Arc<InsertWorkerOpts>,
stats: Arc<stats::CaConnStats>,
) -> Receiver<QueryItem> {
let (tx, rx) = async_channel::bounded(inp.capacity().unwrap_or(256));
tokio::spawn(rate_limiter_worker(opts.store_workers_rate.clone(), inp, tx, stats));
rx
}
pub async fn spawn_scylla_insert_workers(
scyconf: ScyllaConfig,
insert_scylla_sessions: usize,
insert_worker_count: usize,
insert_item_queue: Arc<CommonInsertItemQueue>,
item_inp: Receiver<QueryItem>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::CaConnStats>,
use_rate_limit_queue: bool,
ttls: Ttls,
) -> Result<Vec<JoinHandle<()>>, Error> {
let (q2_tx, q2_rx) = async_channel::bounded(
insert_item_queue
.receiver()
.map_or(20000, |x| x.capacity().unwrap_or(20000)),
);
{
let insert_worker_opts = insert_worker_opts.clone();
let stats = store_stats.clone();
let recv = insert_item_queue
.receiver()
.ok_or_else(|| Error::with_msg_no_trace("can not derive insert queue receiver"))?;
let store_stats = store_stats.clone();
let fut = async move {
if !use_rate_limit_queue {
return;
}
let mut ts_forward_last = Instant::now();
let mut ivl_ema = stats::Ema64::with_k(0.00001);
loop {
let item = if let Ok(x) = recv.recv().await {
x
} else {
break;
};
let ts_received = Instant::now();
let allowed_to_drop = match &item {
QueryItem::Insert(_) => true,
_ => false,
};
let dt_min = {
let rate = insert_worker_opts.store_workers_rate.load(Ordering::Acquire);
Duration::from_nanos(SEC / rate)
};
let mut ema2 = ivl_ema.clone();
{
let dt = ts_received.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ema2.update(dt_ns.min(MS * 100) as f32);
}
let ivl2 = Duration::from_nanos(ema2.ema() as u64);
if allowed_to_drop && ivl2 < dt_min {
//tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await;
stats.store_worker_ratelimit_drop_inc();
} else {
if q2_tx.send(item).await.is_err() {
break;
} else {
let tsnow = Instant::now();
let dt = tsnow.duration_since(ts_forward_last);
let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64;
ivl_ema.update(dt_ns.min(MS * 100) as f32);
ts_forward_last = tsnow;
store_stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release);
}
}
}
info!("intermediate queue done");
};
tokio::spawn(fut);
}
let item_inp = if use_rate_limit_queue {
rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone())
} else {
item_inp
};
let mut jhs = Vec::new();
let mut data_stores = Vec::new();
for _ in 0..insert_scylla_sessions {
@@ -147,15 +151,9 @@ pub async fn spawn_scylla_insert_workers(
data_stores.push(data_store);
}
for worker_ix in 0..insert_worker_count {
let item_inp = item_inp.clone();
let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone();
let stats = store_stats.clone();
let recv = if use_rate_limit_queue {
q2_rx.clone()
} else {
insert_item_queue
.receiver()
.ok_or_else(|| Error::with_msg_no_trace("can not derive receiver"))?
};
let insert_worker_opts = insert_worker_opts.clone();
let fut = async move {
insert_worker_opts
@@ -165,7 +163,7 @@ pub async fn spawn_scylla_insert_workers(
let mut backoff = backoff_0.clone();
let mut i1 = 0;
loop {
let item = if let Ok(item) = recv.recv().await {
let item = if let Ok(item) = item_inp.recv().await {
stats.store_worker_item_recv_inc();
item
} else {

View File

@@ -1,6 +1,8 @@
pub use netpod::CONNECTION_STATUS_DIV;
use crate::store::DataStore;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use log::*;
@@ -12,6 +14,7 @@ use scylla::transport::errors::QueryError;
use series::SeriesId;
use stats::CaConnStats;
use std::net::SocketAddrV4;
use std::sync::Mutex;
use std::time::Duration;
use std::time::SystemTime;
@@ -253,15 +256,22 @@ impl CommonInsertItemQueueSender {
}
pub struct CommonInsertItemQueue {
sender: std::sync::Mutex<Option<async_channel::Sender<QueryItem>>>,
recv: async_channel::Receiver<QueryItem>,
sender: Mutex<Option<Sender<QueryItem>>>,
recv: Receiver<QueryItem>,
}
impl CommonInsertItemQueue {
pub fn new(cap: usize) -> Self {
let (tx, rx) = async_channel::bounded(cap);
Self {
sender: std::sync::Mutex::new(Some(tx)),
sender: Mutex::new(Some(tx)),
recv: rx,
}
}
pub fn from_tx_rx(tx: Sender<QueryItem>, rx: Receiver<QueryItem>) -> Self {
Self {
sender: Mutex::new(Some(tx)),
recv: rx,
}
}
@@ -276,7 +286,7 @@ impl CommonInsertItemQueue {
}
}
pub fn receiver(&self) -> Option<async_channel::Receiver<QueryItem>> {
pub fn receiver(&self) -> Option<Receiver<QueryItem>> {
let ret = self.recv.clone();
Some(ret)
}

8
serde_helper/Cargo.toml Normal file
View File

@@ -0,0 +1,8 @@
[package]
name = "serde_helper"
version = "0.0.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }

14
serde_helper/src/lib.rs Normal file
View File

@@ -0,0 +1,14 @@
#[allow(non_snake_case)]
mod serde_Instant {
use serde::Serializer;
use std::time::Instant;
#[allow(unused)]
pub fn serialize<S>(val: &Instant, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let dur = val.elapsed();
ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64)
}
}