WIP refactor metrics
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqingest"
|
||||
version = "0.3.0-aa.0"
|
||||
version = "0.3.0-aa.1"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2024"
|
||||
|
||||
|
||||
@@ -200,6 +200,7 @@ impl Daemon {
|
||||
|
||||
let ignore_writes = ingest_opts.scylla_ignore_writes();
|
||||
|
||||
let (insert_worker_output_tx, insert_worker_output_rx) = async_channel::bounded(256);
|
||||
let mut insert_worker_jhs = Vec::new();
|
||||
|
||||
if ingest_opts.scylla_disable() {
|
||||
@@ -209,6 +210,7 @@ impl Daemon {
|
||||
iqrx.st_rf1_rx,
|
||||
insert_worker_opts.clone(),
|
||||
insert_worker_stats.clone(),
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -219,6 +221,7 @@ impl Daemon {
|
||||
iqrx.st_rf3_rx,
|
||||
insert_worker_opts.clone(),
|
||||
insert_worker_stats.clone(),
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -229,6 +232,7 @@ impl Daemon {
|
||||
iqrx.mt_rf3_rx,
|
||||
insert_worker_opts.clone(),
|
||||
insert_worker_stats.clone(),
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -239,6 +243,7 @@ impl Daemon {
|
||||
iqrx.lt_rf3_rx,
|
||||
insert_worker_opts.clone(),
|
||||
insert_worker_stats.clone(),
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -249,6 +254,7 @@ impl Daemon {
|
||||
iqrx.lt_rf3_lat5_rx,
|
||||
insert_worker_opts.clone(),
|
||||
insert_worker_stats.clone(),
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -265,6 +271,7 @@ impl Daemon {
|
||||
insert_worker_stats.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -281,6 +288,7 @@ impl Daemon {
|
||||
insert_worker_stats.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -297,6 +305,7 @@ impl Daemon {
|
||||
insert_worker_stats.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -315,6 +324,7 @@ impl Daemon {
|
||||
insert_worker_stats.clone(),
|
||||
ingest_opts.use_rate_limit_queue(),
|
||||
ignore_writes,
|
||||
insert_worker_output_tx.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from_string)?;
|
||||
@@ -356,6 +366,29 @@ impl Daemon {
|
||||
//jh.await.map_err(|e| e.to_string()).map_err(Error::from)??;
|
||||
}
|
||||
|
||||
{
|
||||
// TODO join the task
|
||||
let tx = daemon_ev_tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
match insert_worker_output_rx.recv().await {
|
||||
Ok(x) => {
|
||||
match tx.send(DaemonEvent::ScyllaInsertWorkerOutput(x)).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
// TODO
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// TODO
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8);
|
||||
|
||||
let ret = Self {
|
||||
@@ -709,6 +742,15 @@ impl Daemon {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
ScyllaInsertWorkerOutput(x) => {
|
||||
use scywr::insertworker::InsertWorkerOutputItem::*;
|
||||
match x {
|
||||
Metrics(x) => {
|
||||
self.daemon_metrics.scy_inswork().ingest(x);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let dt = ts1.elapsed();
|
||||
if dt > Duration::from_millis(200) {
|
||||
|
||||
@@ -6,18 +6,16 @@ use crate::opts::RemoveOlderAll;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use dbpg::conn::PgClient;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::future;
|
||||
use futures_util::stream;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use futures_util::future;
|
||||
use futures_util::stream;
|
||||
use log::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::Database;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsMs;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use scywr::config::ScyllaIngestConfig;
|
||||
use scywr::scylla::prepared_statement::PreparedStatement;
|
||||
use scywr::scylla::transport::errors::QueryError;
|
||||
@@ -28,19 +26,20 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "DaqingestTools")]
|
||||
pub enum Error {
|
||||
PgConn(#[from] dbpg::err::Error),
|
||||
Postgres(#[from] dbpg::postgres::Error),
|
||||
ScyllaSession(#[from] scywr::session::Error),
|
||||
ScyllaQuery(#[from] QueryError),
|
||||
ScyllaNextRowError(#[from] NextRowError),
|
||||
ScyllaSchema(#[from] scywr::schema::Error),
|
||||
ScyllaTypeCheck(#[from] scywr::scylla::deserialize::TypeCheckError),
|
||||
ParseError(String),
|
||||
InvalidValue,
|
||||
}
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "DaqingestTools"),
|
||||
enum variants {
|
||||
PgConn(#[from] dbpg::err::Error),
|
||||
Postgres(#[from] dbpg::postgres::Error),
|
||||
ScyllaSession(#[from] scywr::session::Error),
|
||||
ScyllaQuery(#[from] QueryError),
|
||||
ScyllaNextRowError(#[from] NextRowError),
|
||||
ScyllaSchema(#[from] scywr::schema::Error),
|
||||
ScyllaTypeCheck(#[from] scywr::scylla::deserialize::TypeCheckError),
|
||||
ParseError(String),
|
||||
InvalidValue,
|
||||
},
|
||||
);
|
||||
|
||||
pub async fn remove_older(
|
||||
backend: String,
|
||||
|
||||
@@ -95,6 +95,7 @@ const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200);
|
||||
const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10);
|
||||
const DO_RATE_CHECK: bool = false;
|
||||
const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60);
|
||||
const METRICS_EMIT_IVL: Duration = Duration::from_millis(1000 * 1);
|
||||
|
||||
macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
|
||||
|
||||
@@ -1117,6 +1118,7 @@ pub struct CaConn {
|
||||
trace_channel_poll: bool,
|
||||
ts_channel_status_pong_last: Instant,
|
||||
mett: stats::mett::CaConnMetrics,
|
||||
metrics_emit_last: Instant,
|
||||
}
|
||||
|
||||
impl Drop for CaConn {
|
||||
@@ -1186,6 +1188,7 @@ impl CaConn {
|
||||
trace_channel_poll: false,
|
||||
ts_channel_status_pong_last: tsnow,
|
||||
mett: stats::mett::CaConnMetrics::new(),
|
||||
metrics_emit_last: tsnow,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2096,15 +2099,13 @@ impl CaConn {
|
||||
self.mett.recv_read_notify_while_polling_idle().inc();
|
||||
}
|
||||
PollTickState::Wait(st3) => {
|
||||
let dt = tsnow.saturating_duration_since(st3.since);
|
||||
// TODO STATS
|
||||
// self.stats.caget_lat().ingest_dur_dms(dt);
|
||||
// TODO maintain histogram of read-notify latencies
|
||||
if self.read_ioids.remove(&st3.ioid).is_some() {
|
||||
self.mett.ioid_read_done().inc();
|
||||
} else {
|
||||
self.mett.ioid_read_error_not_found().inc();
|
||||
}
|
||||
let dt = tsnow.saturating_duration_since(st3.since);
|
||||
self.mett.caget_lat().push_dur_100us(dt);
|
||||
let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow);
|
||||
if self.trace_channel_poll {
|
||||
trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow);
|
||||
@@ -3052,8 +3053,6 @@ impl CaConn {
|
||||
TcpAsyncWriteRead::from(tcp),
|
||||
self.remote_addr_dbg.to_string(),
|
||||
self.opts.array_truncate,
|
||||
// self.ca_proto_stats.clone(),
|
||||
(),
|
||||
);
|
||||
self.state = CaConnState::Init;
|
||||
self.proto = Some(proto);
|
||||
@@ -3237,13 +3236,23 @@ impl CaConn {
|
||||
CaConnState::EndOfStream => {}
|
||||
}
|
||||
self.iqdqs.housekeeping();
|
||||
self.metrics_emit();
|
||||
if self.metrics_emit_last + METRICS_EMIT_IVL <= tsnow {
|
||||
self.metrics_emit_last = tsnow;
|
||||
self.metrics_emit();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn housekeeping_self(&mut self) {}
|
||||
|
||||
fn metrics_emit(&mut self) {
|
||||
if let Some(x) = self.proto.as_mut() {
|
||||
let mett = x.mett();
|
||||
mett.metrics_emit().inc();
|
||||
let m = mett.take_and_reset();
|
||||
self.mett.proto().ingest(m);
|
||||
}
|
||||
self.mett.metrics_emit().inc();
|
||||
let item = self.mett.take_and_reset();
|
||||
let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item));
|
||||
self.ca_conn_event_out_queue.push_back(item);
|
||||
@@ -3393,12 +3402,13 @@ impl CaConn {
|
||||
loop_max: u32,
|
||||
cx: &mut Context,
|
||||
id: &str,
|
||||
stats: FS,
|
||||
mut stats: FS,
|
||||
mett: &mut stats::mett::CaConnMetrics,
|
||||
) -> Result<Poll<Option<()>>, Error>
|
||||
where
|
||||
Q: Unpin,
|
||||
FB: Fn(&mut VecDeque<T>) -> Option<Q>,
|
||||
FS: Fn(&Q),
|
||||
FS: for<'a, 'b> FnMut(&'a Q, &'b mut stats::mett::CaConnMetrics),
|
||||
{
|
||||
let self_name = "attempt_flush_queue";
|
||||
use Poll::*;
|
||||
@@ -3417,7 +3427,7 @@ impl CaConn {
|
||||
}
|
||||
if sp.is_idle() {
|
||||
if let Some(item) = qu_to_si(qu) {
|
||||
stats(&item);
|
||||
stats(&item, mett);
|
||||
sp.as_mut().send_pin(item);
|
||||
} else {
|
||||
break;
|
||||
@@ -3469,7 +3479,8 @@ macro_rules! flush_queue {
|
||||
qu.shrink_to(qu.capacity() * 7 / 10);
|
||||
}
|
||||
let sp = obj.$sp.as_mut();
|
||||
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
|
||||
let mett = &mut obj.mett;
|
||||
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats, mett) {
|
||||
Ok(Ready(Some(()))) => {
|
||||
*$have.0 |= true;
|
||||
}
|
||||
@@ -3493,7 +3504,8 @@ macro_rules! flush_queue_dqs {
|
||||
qu.shrink_to(qu.capacity() * 7 / 10);
|
||||
}
|
||||
let sp = obj.iqsp.as_mut().$sp();
|
||||
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
|
||||
let mett = &mut obj.mett;
|
||||
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats, mett) {
|
||||
Ok(Ready(Some(()))) => {
|
||||
*$have.0 |= true;
|
||||
}
|
||||
@@ -3563,17 +3575,6 @@ impl Stream for CaConn {
|
||||
}
|
||||
|
||||
{
|
||||
let n = self.iqdqs.len();
|
||||
// TODO STATS
|
||||
self.stats.iiq_len().ingest(n as u32);
|
||||
}
|
||||
|
||||
{
|
||||
let stats2 = self.stats.clone();
|
||||
let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
// TODO STATS
|
||||
stats2.iiq_batch_len().ingest(item.len() as u32);
|
||||
};
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
st_rf1_qu,
|
||||
@@ -3583,14 +3584,11 @@ impl Stream for CaConn {
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"st_rf1_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
|item: &VecDeque<QueryItem>, mett: &mut stats::mett::CaConnMetrics| {
|
||||
mett.iiq_batch_len().push_val(item.len() as u32);
|
||||
}
|
||||
);
|
||||
|
||||
let stats2 = self.stats.clone();
|
||||
let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
// TODO STATS
|
||||
stats2.iiq_batch_len().ingest(item.len() as u32);
|
||||
};
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
st_rf3_qu,
|
||||
@@ -3600,14 +3598,11 @@ impl Stream for CaConn {
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"st_rf3_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
|item: &VecDeque<QueryItem>, mett: &mut stats::mett::CaConnMetrics| {
|
||||
mett.iiq_batch_len().push_val(item.len() as u32);
|
||||
}
|
||||
);
|
||||
|
||||
let stats2 = self.stats.clone();
|
||||
let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
// TODO STATS
|
||||
stats2.iiq_batch_len().ingest(item.len() as u32);
|
||||
};
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
mt_rf3_qu,
|
||||
@@ -3617,14 +3612,11 @@ impl Stream for CaConn {
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"mt_rf3_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
|item: &VecDeque<QueryItem>, mett: &mut stats::mett::CaConnMetrics| {
|
||||
mett.iiq_batch_len().push_val(item.len() as u32);
|
||||
}
|
||||
);
|
||||
|
||||
let stats2 = self.stats.clone();
|
||||
let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
// TODO STATS
|
||||
stats2.iiq_batch_len().ingest(item.len() as u32);
|
||||
};
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
lt_rf3_qu,
|
||||
@@ -3634,14 +3626,11 @@ impl Stream for CaConn {
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"lt_rf3_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
|item: &VecDeque<QueryItem>, mett: &mut stats::mett::CaConnMetrics| {
|
||||
mett.iiq_batch_len().push_val(item.len() as u32);
|
||||
}
|
||||
);
|
||||
|
||||
let stats2 = self.stats.clone();
|
||||
let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
// TODO STATS
|
||||
stats2.iiq_batch_len().ingest(item.len() as u32);
|
||||
};
|
||||
flush_queue_dqs!(
|
||||
self,
|
||||
lt_rf3_lat5_qu,
|
||||
@@ -3651,7 +3640,9 @@ impl Stream for CaConn {
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"lt_rf3_lat5_rx",
|
||||
cx,
|
||||
stats_fn
|
||||
|item: &VecDeque<QueryItem>, mett: &mut stats::mett::CaConnMetrics| {
|
||||
mett.iiq_batch_len().push_val(item.len() as u32);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -3667,7 +3658,7 @@ impl Stream for CaConn {
|
||||
(&mut have_progress, &mut have_pending),
|
||||
"chinf",
|
||||
cx,
|
||||
|_| {}
|
||||
|_, _| {}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -3795,19 +3786,9 @@ impl Stream for CaConn {
|
||||
let poll_ts2 = Instant::now();
|
||||
let dt = poll_ts2.saturating_duration_since(poll_ts1);
|
||||
if self.trace_channel_poll {
|
||||
// TODO STATS
|
||||
self.stats.poll_all_dt().ingest_dur_dms(dt);
|
||||
if dt >= Duration::from_millis(10) {
|
||||
trace!("long poll {dt:?}");
|
||||
} else if dt >= Duration::from_micros(400) {
|
||||
// TODO STATS
|
||||
let v = self.stats.poll_all_dt.to_display();
|
||||
let ip = self.remote_addr_dbg;
|
||||
trace!("poll_all_dt {ip} {v}");
|
||||
}
|
||||
self.mett.poll_all_dt().push_dur_100us(dt);
|
||||
}
|
||||
// TODO STATS
|
||||
// self.stats.poll_reloops().ingest(reloops);
|
||||
self.mett.poll_reloops().push_val(reloops);
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,9 +21,9 @@ use proto::CaProto;
|
||||
use scywr::insertqueues::InsertDeques;
|
||||
use scywr::insertqueues::InsertQueuesTx;
|
||||
use scywr::iteminsertqueue::QueryItem;
|
||||
use stats::rand_xoshiro::Xoshiro128PlusPlus;
|
||||
use stats::CaConnStats;
|
||||
use stats::CaProtoStats;
|
||||
use stats::rand_xoshiro::Xoshiro128PlusPlus;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::net::SocketAddrV4;
|
||||
@@ -160,9 +160,11 @@ impl Stream for CaConn {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
let mut durs = DurationMeasureSteps::new();
|
||||
self.stats.poll_fn_begin().inc();
|
||||
// TODO STATS
|
||||
// self.stats.poll_fn_begin().inc();
|
||||
let ret = loop {
|
||||
self.stats.poll_loop_begin().inc();
|
||||
// TODO STATS
|
||||
// self.stats.poll_loop_begin().inc();
|
||||
let qlen = self.iqdqs.len();
|
||||
if qlen >= self.opts.insert_queue_max * 2 / 3 {
|
||||
self.stats.insert_item_queue_pressure().inc();
|
||||
@@ -191,11 +193,6 @@ impl Stream for CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let n = self.iqdqs.len();
|
||||
self.stats.iiq_len().ingest(n as u32);
|
||||
}
|
||||
|
||||
{
|
||||
// let stats2 = self.stats.clone();
|
||||
// let stats_fn = move |item: &VecDeque<QueryItem>| {
|
||||
|
||||
@@ -29,6 +29,7 @@ pub enum DaemonEvent {
|
||||
Shutdown,
|
||||
ConfigReload(Sender<u64>),
|
||||
GetMetrics(Sender<MetricsPrometheusShort>),
|
||||
ScyllaInsertWorkerOutput(scywr::insertworker::InsertWorkerOutputItem),
|
||||
}
|
||||
|
||||
impl DaemonEvent {
|
||||
@@ -43,6 +44,7 @@ impl DaemonEvent {
|
||||
Shutdown => format!("Shutdown"),
|
||||
ConfigReload(..) => format!("ConfigReload"),
|
||||
GetMetrics(..) => format!("GetMetrics"),
|
||||
ScyllaInsertWorkerOutput(..) => format!("ScyllaInsertWorkerOutput"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ impl From<&InsertQueuesTx> for InsertQueuesTxMetrics {
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct MetricsPrometheusShort {
|
||||
counters: Vec<(String, u64)>,
|
||||
counters: Vec<String>,
|
||||
}
|
||||
|
||||
impl MetricsPrometheusShort {
|
||||
@@ -129,7 +129,7 @@ impl MetricsPrometheusShort {
|
||||
use std::fmt::Write;
|
||||
let mut s = String::new();
|
||||
for e in self.counters.iter() {
|
||||
write!(&mut s, "{} {}\n", e.0, e.1).unwrap();
|
||||
write!(&mut s, "{}\n", e).unwrap();
|
||||
}
|
||||
s
|
||||
}
|
||||
@@ -138,7 +138,7 @@ impl MetricsPrometheusShort {
|
||||
impl From<&stats::mett::DaemonMetrics> for MetricsPrometheusShort {
|
||||
fn from(value: &stats::mett::DaemonMetrics) -> Self {
|
||||
Self {
|
||||
counters: value.to_flatten_prometheus(),
|
||||
counters: value.to_flatten_prometheus("daemon"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::iteminsertqueue::insert_item_fut;
|
||||
use crate::iteminsertqueue::insert_msp_fut;
|
||||
use crate::store::DataStore;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use atomic::AtomicU64;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
@@ -100,6 +101,11 @@ async fn back_off_sleep(backoff_dt: &mut Duration) {
|
||||
tokio::time::sleep(*backoff_dt).await;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum InsertWorkerOutputItem {
|
||||
Metrics(stats::mett::ScyllaInsertWorker),
|
||||
}
|
||||
|
||||
pub struct InsertWorkerOpts {
|
||||
pub store_workers_rate: Arc<AtomicU64>,
|
||||
pub insert_workers_running: Arc<AtomicU64>,
|
||||
@@ -118,6 +124,7 @@ pub async fn spawn_scylla_insert_workers(
|
||||
store_stats: Arc<stats::InsertWorkerStats>,
|
||||
use_rate_limit_queue: bool,
|
||||
ignore_writes: bool,
|
||||
tx: Sender<InsertWorkerOutputItem>,
|
||||
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
|
||||
let item_inp = if use_rate_limit_queue {
|
||||
crate::ratelimit::rate_limiter(insert_worker_opts.store_workers_rate.clone(), item_inp)
|
||||
@@ -140,6 +147,7 @@ pub async fn spawn_scylla_insert_workers(
|
||||
Some(data_store),
|
||||
ignore_writes,
|
||||
store_stats.clone(),
|
||||
tx.clone(),
|
||||
));
|
||||
jhs.push(jh);
|
||||
}
|
||||
@@ -152,6 +160,7 @@ pub async fn spawn_scylla_insert_workers_dummy(
|
||||
item_inp: Receiver<VecDeque<QueryItem>>,
|
||||
insert_worker_opts: Arc<InsertWorkerOpts>,
|
||||
store_stats: Arc<stats::InsertWorkerStats>,
|
||||
tx: Sender<InsertWorkerOutputItem>,
|
||||
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
|
||||
let mut jhs = Vec::new();
|
||||
for worker_ix in 0..insert_worker_count {
|
||||
@@ -164,6 +173,7 @@ pub async fn spawn_scylla_insert_workers_dummy(
|
||||
data_store,
|
||||
true,
|
||||
store_stats.clone(),
|
||||
tx.clone(),
|
||||
));
|
||||
jhs.push(jh);
|
||||
}
|
||||
@@ -178,9 +188,13 @@ async fn worker_streamed(
|
||||
data_store: Option<Arc<DataStore>>,
|
||||
ignore_writes: bool,
|
||||
stats: Arc<InsertWorkerStats>,
|
||||
tx: Sender<InsertWorkerOutputItem>,
|
||||
) -> Result<(), Error> {
|
||||
debug_setup!("worker_streamed begin");
|
||||
stats.worker_start().inc();
|
||||
let tsnow = Instant::now();
|
||||
let mut mett = stats::mett::ScyllaInsertWorker::new();
|
||||
let mut mett_emit_last = tsnow;
|
||||
let metrics_ivl = Duration::from_millis(1000);
|
||||
insert_worker_opts
|
||||
.insert_workers_running
|
||||
.fetch_add(1, atomic::Ordering::AcqRel);
|
||||
@@ -199,9 +213,10 @@ async fn worker_streamed(
|
||||
debug_setup!("waiting for item");
|
||||
while let Some(item) = stream.next().await {
|
||||
trace_item_execute!("see item");
|
||||
let tsnow = Instant::now();
|
||||
match item {
|
||||
Ok(_) => {
|
||||
stats.inserted_values().inc();
|
||||
mett.job_ok().inc();
|
||||
// TODO compute the insert latency bin and count.
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -215,9 +230,22 @@ async fn worker_streamed(
|
||||
},
|
||||
_ => e.into(),
|
||||
};
|
||||
mett.job_err().inc();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
}
|
||||
}
|
||||
if mett_emit_last + metrics_ivl <= tsnow {
|
||||
mett_emit_last = tsnow;
|
||||
let m = mett.take_and_reset();
|
||||
let item = InsertWorkerOutputItem::Metrics(m);
|
||||
match tx.send(item).await {
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
error!("insert worker can not emit metrics");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut stream = Box::pin(stream);
|
||||
|
||||
@@ -13,4 +13,5 @@ rand_xoshiro = "0.6.0"
|
||||
stats_types = { path = "../stats_types" }
|
||||
stats_proc = { path = "../stats_proc" }
|
||||
log = { path = "../log" }
|
||||
mettrics = { version = "0.0.5", path = "../../mettrics" }
|
||||
mettrics = { version = "0.0.6", path = "../../mettrics" }
|
||||
ca_proto = { path = "../../daqbuf-ca-proto", package = "daqbuf-ca-proto" }
|
||||
|
||||
@@ -1,6 +1,16 @@
|
||||
mod Metrics {
|
||||
type StructName = ScyllaInsertWorker;
|
||||
enum counters {
|
||||
metrics_emit,
|
||||
job_ok,
|
||||
job_err,
|
||||
}
|
||||
}
|
||||
|
||||
mod Metrics {
|
||||
type StructName = CaConnMetrics;
|
||||
enum counters {
|
||||
metrics_emit,
|
||||
ioid_read_begin,
|
||||
ioid_read_done,
|
||||
ioid_read_timeout,
|
||||
@@ -49,6 +59,17 @@ mod Metrics {
|
||||
get_series_id_ok,
|
||||
channel_add_exists,
|
||||
}
|
||||
enum histolog2s {
|
||||
clock_ioc_diff_abs,
|
||||
caget_lat,
|
||||
poll_reloops,
|
||||
poll_all_dt,
|
||||
iiq_batch_len,
|
||||
}
|
||||
mod Compose {
|
||||
type Input = ca_proto::mett::CaProtoMetrics;
|
||||
type Name = proto;
|
||||
}
|
||||
}
|
||||
|
||||
mod Metrics {
|
||||
@@ -65,4 +86,8 @@ mod Metrics {
|
||||
type Input = CaConnSetMetrics;
|
||||
type Name = ca_conn_set;
|
||||
}
|
||||
mod Compose {
|
||||
type Input = ScyllaInsertWorker;
|
||||
type Name = scy_inswork;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use mettrics::types::CounterU32;
|
||||
use mettrics::types::HistoLog2;
|
||||
|
||||
mettrics::macros::make_metrics!("mettdecl.rs");
|
||||
|
||||
@@ -337,7 +337,6 @@ stats_proc::stats_struct!((
|
||||
inserts_msp_grid,
|
||||
inserts_queue_pop_for_global,
|
||||
inserts_queue_push,
|
||||
inserts_queue_drop,
|
||||
insert_item_queue_pressure,
|
||||
insert_item_queue_full,
|
||||
out_queue_full,
|
||||
@@ -359,10 +358,6 @@ stats_proc::stats_struct!((
|
||||
channel_info_insert_done,
|
||||
ivl_insert_done,
|
||||
mute_insert_done,
|
||||
loop1_count,
|
||||
loop2_count,
|
||||
loop3_count,
|
||||
loop4_count,
|
||||
command_can_not_reply,
|
||||
time_handle_conn_listen,
|
||||
time_handle_peer_ready,
|
||||
@@ -379,12 +374,6 @@ stats_proc::stats_struct!((
|
||||
ping_start,
|
||||
ping_no_proto,
|
||||
pong_timeout,
|
||||
poll_fn_begin,
|
||||
poll_loop_begin,
|
||||
poll_reloop,
|
||||
poll_pending,
|
||||
poll_no_progress_no_pending,
|
||||
poll_wake_break,
|
||||
storage_queue_send,
|
||||
storage_queue_pending,
|
||||
event_add_res_recv,
|
||||
@@ -412,16 +401,7 @@ stats_proc::stats_struct!((
|
||||
ca_proto_version_later,
|
||||
no_cid_for_subid,
|
||||
),
|
||||
histolog2s(
|
||||
poll_all_dt,
|
||||
poll_op3_dt,
|
||||
poll_reloops,
|
||||
pong_recv_lat,
|
||||
ca_ts_off,
|
||||
iiq_len,
|
||||
iiq_batch_len,
|
||||
caget_lat,
|
||||
),
|
||||
histolog2s(poll_op3_dt, pong_recv_lat, ca_ts_off,),
|
||||
),
|
||||
agg(name(CaConnStatsAgg), parent(CaConnStats)),
|
||||
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
|
||||
@@ -429,8 +409,8 @@ stats_proc::stats_struct!((
|
||||
name(CaProtoStats),
|
||||
prefix(ca_proto),
|
||||
counters(
|
||||
tcp_recv_count,
|
||||
tcp_recv_bytes,
|
||||
// tcp_recv_count,
|
||||
// tcp_recv_bytes,
|
||||
protocol_issue,
|
||||
payload_std_too_large,
|
||||
payload_ext_but_small,
|
||||
|
||||
Reference in New Issue
Block a user