Remove old stats struct

This commit is contained in:
Dominik Werder
2025-05-06 15:29:09 +02:00
parent af3b550a43
commit 573bb83b64
18 changed files with 253 additions and 444 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.3.0-aa.2"
version = "0.3.0-aa.3"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -162,6 +162,10 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> {
let opstr = if do_change { "change" } else { "check" };
info!("start scylla schema {}", opstr);
info!("{:?}", opts.scylla_config_st());
info!("{:?}", opts.scylla_config_mt());
info!("{:?}", opts.scylla_config_lt());
info!("{:?}", opts.scylla_config_st_rf1());
scywr::schema::migrate_scylla_data_schema_all_rt(
[
&opts.scylla_config_st(),

View File

@@ -23,9 +23,6 @@ use scywr::config::ScyllaIngestConfig;
use scywr::insertqueues::InsertQueuesRx;
use scywr::insertqueues::InsertQueuesTx;
use scywr::insertworker::InsertWorkerOpts;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
use stats::rand_xoshiro::rand_core::RngCore;
use std::sync::Arc;
use std::sync::atomic;
@@ -68,9 +65,6 @@ pub struct Daemon {
count_assigned: usize,
last_status_print: SystemTime,
insert_workers_jhs: Vec<JoinHandle<Result<(), scywr::insertworker::Error>>>,
stats: Arc<DaemonStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
shutting_down: bool,
connset_ctrl: CaConnSetCtrl,
connset_status_last: Instant,
@@ -90,15 +84,11 @@ impl Daemon {
pub async fn new(opts: DaemonOpts, ingest_opts: CaIngestOpts) -> Result<Self, Error> {
let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32);
let series_by_channel_stats = Arc::new(SeriesByChannelStats::new());
let insert_worker_stats = Arc::new(InsertWorkerStats::new());
// TODO keep join handles and await later
let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers::<
dbpg::seriesbychannel::SalterRandom,
>(2, &opts.pgconf, series_by_channel_stats.clone())
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let (channel_info_query_tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers::<dbpg::seriesbychannel::SalterRandom>(2, &opts.pgconf)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
// TODO so far a dummy
let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16);
@@ -207,7 +197,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf1_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
insert_worker_output_tx.clone(),
)
.await
@@ -218,7 +207,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
insert_worker_output_tx.clone(),
)
.await
@@ -229,7 +217,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.mt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
insert_worker_output_tx.clone(),
)
.await
@@ -240,7 +227,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.lt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
insert_worker_output_tx.clone(),
)
.await
@@ -251,7 +237,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.lt_rf3_lat5_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
insert_worker_output_tx.clone(),
)
.await
@@ -266,7 +251,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf1_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
insert_worker_output_tx.clone(),
@@ -283,7 +267,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.st_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
insert_worker_output_tx.clone(),
@@ -300,7 +283,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
iqrx.mt_rf3_rx,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
insert_worker_output_tx.clone(),
@@ -319,7 +301,6 @@ impl Daemon {
ingest_opts.insert_worker_concurrency(),
lt_rx_combined,
insert_worker_opts.clone(),
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
ignore_writes,
insert_worker_output_tx.clone(),
@@ -328,7 +309,6 @@ impl Daemon {
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
};
let stats = Arc::new(DaemonStats::new());
#[cfg(feature = "bsread")]
if let Some(bsaddr) = &opts.test_bsread_addr {
@@ -402,9 +382,6 @@ impl Daemon {
count_assigned: 0,
last_status_print: SystemTime::now(),
insert_workers_jhs: insert_worker_jhs,
stats,
insert_worker_stats,
series_by_channel_stats,
shutting_down: false,
connset_ctrl: conn_set_ctrl,
connset_status_last: Instant::now(),
@@ -420,10 +397,6 @@ impl Daemon {
Ok(ret)
}
fn stats(&self) -> &Arc<DaemonStats> {
&self.stats
}
async fn check_health(&mut self, ts1: Instant) -> Result<(), Error> {
self.check_health_connset(ts1)?;
Ok(())
@@ -862,7 +835,6 @@ impl Daemon {
pub async fn spawn_metrics(&mut self) -> Result<(), Error> {
let tx = self.tx.clone();
let daemon_stats = self.stats().clone();
let connset_cmd_tx = self.connset_ctrl.sender().clone();
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
let rres = RoutesResources::new(
@@ -880,13 +852,7 @@ impl Daemon {
);
let rres = Arc::new(rres);
let metrics_jh = {
let stats_set = StatsSet::new(
daemon_stats,
self.insert_worker_stats.clone(),
self.series_by_channel_stats.clone(),
self.connset_ctrl.ioc_finder_stats().clone(),
self.opts.insert_frac.clone(),
);
let stats_set = StatsSet::new(self.opts.insert_frac.clone());
let fut = netfetch::metrics::metrics_service(
self.ingest_opts.api_bind(),
dcom,

View File

@@ -1,9 +1,5 @@
use crate::opts::CaFind;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use stats::IocFinderStats;
use std::sync::Arc;
use std::time::Duration;
autoerr::create_error_v1!(
@@ -26,7 +22,6 @@ pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> {
let batch_run_max = Duration::from_millis(1200);
let in_flight_max = 1;
let batch_size = 1;
let stats = Arc::new(IocFinderStats::new());
channels_input_tx.send(cmd.channel).await.unwrap();
let stream = netfetch::ca::findioc::FindIocStream::new(
channels_input_rx,
@@ -35,7 +30,6 @@ pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> {
batch_run_max,
in_flight_max,
batch_size,
stats,
);
let deadline = taskrun::tokio::time::sleep(Duration::from_millis(2000));
let mut stream = Box::pin(stream.take_until(deadline));

View File

@@ -15,7 +15,6 @@ use netpod::SeriesKind;
use netpod::Shape;
use serde::Serialize;
use series::SeriesId;
use stats::SeriesByChannelStats;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -144,16 +143,11 @@ struct Worker {
qu_select: PgStatement,
qu_insert: PgStatement,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
pg_client_jh: JoinHandle<Result<(), crate::err::Error>>,
}
impl Worker {
async fn new(
db: &Database,
batch_rx: Receiver<Vec<ChannelInfoQuery>>,
stats: Arc<SeriesByChannelStats>,
) -> Result<Self, Error> {
async fn new(db: &Database, batch_rx: Receiver<Vec<ChannelInfoQuery>>) -> Result<Self, Error> {
use tokio_postgres::types::Type;
debug!("Worker make_pg_client");
let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?;
@@ -206,7 +200,6 @@ impl Worker {
qu_select,
qu_insert,
batch_rx,
stats,
pg_client_jh,
};
Ok(ret)
@@ -215,8 +208,9 @@ impl Worker {
async fn work<FR: HashSalter>(&mut self) -> Result<(), Error> {
let batch_rx = self.batch_rx.clone();
while let Ok(batch) = batch_rx.recv().await {
self.stats.recv_batch().inc();
self.stats.recv_items().add(batch.len() as _);
// TODO
// stats.recv_batch().inc();
// stats.recv_items().add(batch.len() as _);
for x in &batch {
trace3!(
"search for {} {} {:?} {:?}",
@@ -233,7 +227,8 @@ impl Worker {
match self.pg.execute("commit", &[]).await {
Ok(n) => {
let dt = ts1.elapsed();
self.stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32);
// TODO
// stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32);
if dt > Duration::from_millis(40) {
debug!("commit {} {:.0} ms", n, dt.as_secs_f32());
}
@@ -313,7 +308,8 @@ impl Worker {
match e.0.tx.make_send(item).await {
Ok(()) => {}
Err(_) => {
self.stats.res_tx_fail.inc();
// TODO
// stats.res_tx_fail.inc();
}
};
}
@@ -603,7 +599,6 @@ pub trait HashSalter {
pub async fn start_lookup_workers<FR: HashSalter>(
worker_count: usize,
db: &Database,
stats: Arc<SeriesByChannelStats>,
) -> Result<
(
Sender<ChannelInfoQuery>,
@@ -619,7 +614,7 @@ pub async fn start_lookup_workers<FR: HashSalter>(
let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx);
let mut jhs = Vec::new();
for _ in 0..worker_count {
let mut worker = Worker::new(db, batch_rx.clone(), stats.clone()).await?;
let mut worker = Worker::new(db, batch_rx.clone()).await?;
let jh = tokio::task::spawn(async move { worker.work::<FR>().await });
jhs.push(jh);
}
@@ -727,7 +722,6 @@ fn test_series_by_channel_01() {
let channel = "chn-test-00";
let channel_01 = "chn-test-01";
let channel_02 = "chn-test-02";
let series_by_channel_stats = Arc::new(SeriesByChannelStats::new());
let pgconf = test_db_conf();
if false {
psql_play(&pgconf).await?;
@@ -767,8 +761,7 @@ fn test_series_by_channel_01() {
}
// TODO keep join handles and await later
let (channel_info_query_tx, _jhs, _jh) =
dbpg::seriesbychannel::start_lookup_workers::<SalterTest>(1, &pgconf, series_by_channel_stats.clone())
.await?;
dbpg::seriesbychannel::start_lookup_workers::<SalterTest>(1, &pgconf).await?;
let mut rxs = Vec::new();
let rx = {

View File

@@ -2337,7 +2337,7 @@ impl CaConn {
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
// binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
{
let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;

View File

@@ -44,7 +44,6 @@ use statemap::ConnectionState;
use statemap::ConnectionStateValue;
use statemap::WithStatusSeriesIdState;
use statemap::WithStatusSeriesIdStateInner;
use stats::IocFinderStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
@@ -247,7 +246,6 @@ pub enum CaConnSetItem {
pub struct CaConnSetCtrl {
tx: Sender<CaConnSetEvent>,
rx: Receiver<CaConnSetItem>,
ioc_finder_stats: Arc<IocFinderStats>,
jh: JoinHandle<Result<(), Error>>,
}
@@ -305,10 +303,6 @@ impl CaConnSetCtrl {
self.jh.await??;
Ok(())
}
pub fn ioc_finder_stats(&self) -> &Arc<IocFinderStats> {
&self.ioc_finder_stats
}
}
#[derive(Debug)]
@@ -432,14 +426,8 @@ impl CaConnSet {
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200);
let (connset_out_tx, connset_out_rx) = async_channel::bounded(200);
let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(400);
let ioc_finder_stats = Arc::new(IocFinderStats::new());
let (find_ioc_query_tx, ioc_finder_jh) = super::finder::start_finder(
find_ioc_res_tx.clone(),
backend.clone(),
ingest_opts,
ioc_finder_stats.clone(),
)
.unwrap();
let (find_ioc_query_tx, ioc_finder_jh) =
super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), ingest_opts).unwrap();
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400);
let connset = Self {
ticker: Self::new_self_ticker(),
@@ -485,7 +473,6 @@ impl CaConnSet {
CaConnSetCtrl {
tx: connset_inp_tx,
rx: connset_out_rx,
ioc_finder_stats,
jh,
}
}

View File

@@ -12,9 +12,7 @@ use dbpg::iocindex::IocSearchIndexWorker;
use dbpg::postgres::Row as PgRow;
use log::*;
use netpod::Database;
use stats::IocFinderStats;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
@@ -75,10 +73,9 @@ pub fn start_finder(
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
opts: CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<(Sender<IocAddrQuery>, JoinHandle<Result<(), Error>>), Error> {
let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX);
let jh = taskrun::spawn(finder_full(qrx, tx, backend, opts, stats));
let jh = taskrun::spawn(finder_full(qrx, tx, backend, opts));
Ok((qtx, jh))
}
@@ -87,17 +84,10 @@ async fn finder_full(
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
opts: CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
let (tx1, rx1) = async_channel::bounded(20);
let jh1 = taskrun::spawn(finder_worker(
qrx,
tx1,
backend,
opts.postgresql_config().clone(),
stats.clone(),
));
let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone(), stats));
let jh1 = taskrun::spawn(finder_worker(qrx, tx1, backend, opts.postgresql_config().clone()));
let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone()));
jh1.await??;
trace!("finder::finder_full awaited A");
jh2.await??;
@@ -111,7 +101,6 @@ async fn finder_worker(
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
// TODO do something with join handle
let (batch_rx, jh_batch) =
@@ -123,7 +112,6 @@ async fn finder_worker(
tx.clone(),
backend.clone(),
db.clone(),
stats.clone(),
));
jhs.push(jh);
}
@@ -141,7 +129,6 @@ async fn finder_worker_single(
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
debug!("finder_worker_single make_pg_client");
let (pg, jh) = make_pg_client(&db).await?;
@@ -159,8 +146,9 @@ async fn finder_worker_single(
for e in batch.iter().filter(|x| series::dbg::dbg_chn(x.name())) {
info!("searching database for {:?}", e);
}
stats.dbsearcher_batch_recv().inc();
stats.dbsearcher_item_recv().add(batch.len() as _);
// TODO
// stats.dbsearcher_batch_recv().inc();
// stats.dbsearcher_item_recv().add(batch.len() as _);
let ts1 = Instant::now();
let (batch, pass_through) = batch.into_iter().fold((Vec::new(), Vec::new()), |(mut a, mut b), x| {
if x.use_cache() {
@@ -189,9 +177,9 @@ async fn finder_worker_single(
}
match qres {
Ok(rows) => {
stats.dbsearcher_select_res_0().add(rows.len() as _);
// stats.dbsearcher_select_res_0().add(rows.len() as _);
if rows.len() != batch.len() {
stats.dbsearcher_select_error_len_mismatch().inc();
// stats.dbsearcher_select_error_len_mismatch().inc();
error!("query result len {} batch len {}", rows.len(), batch.len());
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
@@ -215,8 +203,9 @@ async fn finder_worker_single(
let items_len = items.len();
match tx.send(items).await {
Ok(_) => {
stats.dbsearcher_batch_send().inc();
stats.dbsearcher_item_send().add(items_len as _);
// TODO
// stats.dbsearcher_batch_send().inc();
// stats.dbsearcher_item_send().add(items_len as _);
}
Err(e) => {
error!("finder sees: {}", e);
@@ -243,10 +232,9 @@ async fn finder_network_if_not_found(
rx: Receiver<VecDeque<FindIocRes>>,
tx: Sender<VecDeque<FindIocRes>>,
opts: CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
let self_name = "finder_network_if_not_found";
let (net_tx, net_rx, jh_ca_search) = ca_search_workers_start(&opts, stats.clone()).await?;
let (net_tx, net_rx, jh_ca_search) = ca_search_workers_start(&opts).await?;
let jh2 = taskrun::spawn(process_net_result(net_rx, tx.clone(), opts.clone()));
'outer: while let Ok(item) = rx.recv().await {
let mut res = VecDeque::new();

View File

@@ -9,13 +9,11 @@ use log::*;
use proto::CaMsg;
use proto::CaMsgTy;
use proto::HeadInfo;
use stats::IocFinderStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::Ipv4Addr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::task::Context;
@@ -120,7 +118,6 @@ pub struct FindIocStream {
thr_msg_1: ThrottleTrace,
#[allow(unused)]
thr_msg_2: ThrottleTrace,
stats: Arc<IocFinderStats>,
}
impl FindIocStream {
@@ -131,7 +128,6 @@ impl FindIocStream {
batch_run_max: Duration,
in_flight_max: usize,
batch_size: usize,
stats: Arc<IocFinderStats>,
) -> Self {
let sock = unsafe { Self::create_socket() }.unwrap();
let afd = AsyncFd::new(sock.0).unwrap();
@@ -159,7 +155,6 @@ impl FindIocStream {
thr_msg_0: ThrottleTrace::new(Duration::from_millis(1000)),
thr_msg_1: ThrottleTrace::new(Duration::from_millis(1000)),
thr_msg_2: ThrottleTrace::new(Duration::from_millis(1000)),
stats,
}
}
@@ -284,10 +279,7 @@ impl FindIocStream {
Poll::Ready(Ok(()))
}
unsafe fn try_read(
sock: i32,
stats: &IocFinderStats,
) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
unsafe fn try_read(sock: i32) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
let tsnow = Instant::now();
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
@@ -310,14 +302,14 @@ impl FindIocStream {
return Poll::Ready(Err(Error::ReadFailure));
}
} else if ec < 0 {
stats.ca_udp_io_error().inc();
// stats.ca_udp_io_error().inc();
error!("unexpected received {ec}");
Poll::Ready(Err(Error::ReadFailure))
} else if ec == 0 {
stats.ca_udp_io_empty().inc();
// stats.ca_udp_io_empty().inc();
Poll::Ready(Err(Error::ReadEmpty))
} else {
stats.ca_udp_io_recv().inc();
// stats.ca_udp_io_recv().inc();
let saddr2: libc::sockaddr_in = unsafe { std::mem::transmute_copy(&saddr_mem) };
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
let src_port = u16::from_be(saddr2.sin_port);
@@ -366,15 +358,15 @@ impl FindIocStream {
accounted += 16 + hi.payload_len();
}
if accounted != ec as u32 {
stats.ca_udp_unaccounted_data().inc();
// stats.ca_udp_unaccounted_data().inc();
debug!("unaccounted data ec {} accounted {}", ec, accounted);
}
if msgs.len() < 1 {
stats.ca_udp_warn().inc();
// stats.ca_udp_warn().inc();
debug!("received answer without messages");
}
if msgs.len() == 1 {
stats.ca_udp_warn().inc();
// stats.ca_udp_warn().inc();
debug!("received answer with single message: {msgs:?}");
}
let mut good = true;
@@ -384,7 +376,7 @@ impl FindIocStream {
good = false;
}
} else {
stats.ca_udp_first_msg_not_version().inc();
// stats.ca_udp_first_msg_not_version().inc();
}
// trace2!("recv {:?} {:?}", src_addr, msgs);
let mut res = Vec::new();
@@ -398,7 +390,7 @@ impl FindIocStream {
res.push((SearchId(k.id), addr));
}
_ => {
stats.ca_udp_error().inc();
// stats.ca_udp_error().inc();
warn!("try_read: unknown message received {:?}", msg.ty);
}
}
@@ -449,7 +441,7 @@ impl FindIocStream {
};
self.in_flight.insert(bid.clone(), batch);
self.batch_send_queue.push_back(bid);
self.stats.ca_udp_batch_created().inc();
// stats.ca_udp_batch_created().inc();
}
fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) {
@@ -477,11 +469,11 @@ impl FindIocStream {
dt,
};
// trace!("udp search response {res:?}");
self.stats.ca_udp_recv_result().inc();
// stats.ca_udp_recv_result().inc();
self.out_queue.push_back(res);
}
None => {
self.stats.ca_udp_logic_error().inc();
// stats.ca_udp_logic_error().inc();
error!(
"logic error batch sids / channels lens: {} vs {}",
batch.sids.len(),
@@ -537,7 +529,7 @@ impl FindIocStream {
sids.push(sid.clone());
chns.push(batch.channels[i2].clone());
dts.push(dt);
self.stats.ca_udp_recv_timeout().inc();
// stats.ca_udp_recv_timeout().inc();
}
}
bids.push(bid.clone());
@@ -692,7 +684,7 @@ impl Stream for FindIocStream {
break match self.afd.poll_read_ready(cx) {
Ready(Ok(mut g)) => {
// debug!("BLOCK AA");
match unsafe { Self::try_read(self.sock.0, &self.stats) } {
match unsafe { Self::try_read(self.sock.0) } {
Ready(Ok((src, res))) => {
self.handle_result(src, res);
if self.ready_for_end_of_stream() {

View File

@@ -4,12 +4,10 @@ use async_channel::Receiver;
use async_channel::Sender;
use futures_util::StreamExt;
use log::*;
use stats::IocFinderStats;
use std::collections::VecDeque;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::sync::Arc;
use std::time::Duration;
use taskrun::tokio;
use tokio::task::JoinHandle;
@@ -59,7 +57,6 @@ async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
pub async fn ca_search_workers_start(
opts: &CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<
(
Sender<String>,
@@ -72,7 +69,7 @@ pub async fn ca_search_workers_start(
let batch_run_max = Duration::from_millis(800);
let (inp_tx, inp_rx) = async_channel::bounded(256);
let (out_tx, out_rx) = async_channel::bounded(256);
let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16, stats);
let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16);
let jh = taskrun::spawn(finder_run(finder, out_tx));
Ok((inp_tx, out_rx, jh))
}

View File

@@ -33,11 +33,6 @@ use scywr::iteminsertqueue::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use stats::CaProtoStats;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::IocFinderStats;
use stats::SeriesByChannelStats;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddr;
@@ -128,28 +123,12 @@ impl IntoResponse for CustomErrorResponse {
#[derive(Clone)]
pub struct StatsSet {
daemon: Arc<DaemonStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
insert_frac: Arc<AtomicU64>,
}
impl StatsSet {
pub fn new(
daemon: Arc<DaemonStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
insert_frac: Arc<AtomicU64>,
) -> Self {
Self {
daemon,
insert_worker_stats,
series_by_channel_stats,
ioc_finder_stats,
insert_frac,
}
pub fn new(insert_frac: Arc<AtomicU64>) -> Self {
Self { insert_frac }
}
}
@@ -340,23 +319,12 @@ impl DaemonComm {
fn metricbeat(stats_set: &StatsSet) -> axum::Json<serde_json::Value> {
let mut map = serde_json::Map::new();
map.insert("daemon".to_string(), stats_set.daemon.json());
map.insert("insert_worker_stats".to_string(), stats_set.insert_worker_stats.json());
// map.insert("insert_worker_stats".to_string(), stats_set.insert_worker_stats.json());
let mut ret = serde_json::Map::new();
ret.insert("daqingest".to_string(), serde_json::Value::Object(map));
axum::Json(serde_json::Value::Object(ret))
}
fn metrics(stats_set: &StatsSet) -> String {
let ss = [
stats_set.daemon.prometheus(),
stats_set.insert_worker_stats.prometheus(),
stats_set.series_by_channel_stats.prometheus(),
stats_set.ioc_finder_stats.prometheus(),
];
ss.join("")
}
pub struct RoutesResources {
backend: String,
worker_tx: Sender<ChannelInfoQuery>,
@@ -427,10 +395,8 @@ fn make_routes(
let dcom = dcom.clone();
let stats_set = stats_set.clone();
|| async move {
let prom2 = metrics2(dcom).await.unwrap_or(String::new());
let mut s = metrics(&stats_set);
s.push_str(&prom2);
s
let prom = metrics2(dcom).await.unwrap_or(String::new());
prom
}
}),
),

View File

@@ -1,7 +1,7 @@
use crate::config::ScyllaIngestConfig;
use crate::iteminsertqueue::Accounting;
use crate::iteminsertqueue::AccountingRecv;
use crate::iteminsertqueue::BinWriteIndexV03;
use crate::iteminsertqueue::BinWriteIndexV04;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::MspItem;
@@ -20,7 +20,6 @@ use log;
use netpod::ttl::RetentionTime;
use smallvec::SmallVec;
use smallvec::smallvec;
use stats::InsertWorkerStats;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
@@ -53,22 +52,25 @@ autoerr::create_error_v1!(
},
);
fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::InsertFutError) {
fn stats_inc_for_err(err: &crate::iteminsertqueue::InsertFutError) {
use crate::iteminsertqueue::InsertFutError;
match err {
InsertFutError::Execution(e) => match e {
scylla::errors::ExecutionError::RequestTimeout(_) => {
stats.db_timeout().inc();
// TODO
// stats.db_timeout().inc();
}
_ => {
if true {
warn!("db error {}", err);
}
stats.db_error().inc();
// TODO
// stats.db_error().inc();
}
},
InsertFutError::NoFuture => {
stats.logic_error().inc();
// TODO
// stats.logic_error().inc();
}
}
}
@@ -108,7 +110,6 @@ pub async fn spawn_scylla_insert_workers(
insert_worker_concurrency: usize,
item_inp: Receiver<VecDeque<QueryItem>>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::InsertWorkerStats>,
use_rate_limit_queue: bool,
ignore_writes: bool,
tx: Sender<InsertWorkerOutputItem>,
@@ -133,7 +134,6 @@ pub async fn spawn_scylla_insert_workers(
insert_worker_opts.clone(),
Some(data_store),
ignore_writes,
store_stats.clone(),
tx.clone(),
));
jhs.push(jh);
@@ -146,7 +146,6 @@ pub async fn spawn_scylla_insert_workers_dummy(
insert_worker_concurrency: usize,
item_inp: Receiver<VecDeque<QueryItem>>,
insert_worker_opts: Arc<InsertWorkerOpts>,
store_stats: Arc<stats::InsertWorkerStats>,
tx: Sender<InsertWorkerOutputItem>,
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
let mut jhs = Vec::new();
@@ -159,7 +158,6 @@ pub async fn spawn_scylla_insert_workers_dummy(
insert_worker_opts.clone(),
data_store,
true,
store_stats.clone(),
tx.clone(),
));
jhs.push(jh);
@@ -214,7 +212,6 @@ async fn worker_streamed(
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Option<Arc<DataStore>>,
ignore_writes: bool,
stats: Arc<InsertWorkerStats>,
tx: Sender<InsertWorkerOutputItem>,
) -> Result<(), Error> {
debug_setup!("worker_streamed begin");
@@ -231,7 +228,7 @@ async fn worker_streamed(
.map_or_else(|| format!("dummy"), |x| x.rett.debug_tag().to_string());
let stream = inspect_items(stream, worker_name.clone());
if let Some(data_store) = data_store {
let stream = transform_to_db_futures(stream, data_store, ignore_writes, stats.clone());
let stream = transform_to_db_futures(stream, data_store, ignore_writes);
let stream = stream
.map(|x| futures_util::stream::iter(x))
.flatten_unordered(Some(1))
@@ -254,7 +251,7 @@ async fn worker_streamed(
}
Err(e) => {
mett.job_err().inc();
stats_inc_for_err(&stats, &e);
stats_inc_for_err(&e);
}
}
if mett_emit_last + metrics_ivl <= tsnow {
@@ -276,7 +273,8 @@ async fn worker_streamed(
drop(item);
}
};
stats.worker_finish().inc();
// TODO
// stats.worker_finish().inc();
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
@@ -293,7 +291,6 @@ fn transform_to_db_futures<S>(
item_inp: S,
data_store: Arc<DataStore>,
ignore_writes: bool,
stats: Arc<InsertWorkerStats>,
) -> impl Stream<Item = Vec<FutJob>>
where
S: Stream<Item = VecDeque<QueryItem>>,
@@ -302,7 +299,8 @@ where
// TODO possible without box?
// let item_inp = Box::pin(item_inp);
item_inp.map(move |batch| {
stats.item_recv.inc();
// TODO
// stats.item_recv.inc();
trace_transform!("transform_to_db_futures have batch len {}", batch.len());
let tsnow = Instant::now();
let mut res = Vec::with_capacity(32);
@@ -329,11 +327,11 @@ where
prepare_timebin_v02_insert_futs(item, &data_store, tsnow)
}
}
QueryItem::BinWriteIndexV03(item) => {
QueryItem::BinWriteIndexV04(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_bin_write_index_v03_insert_futs(item, &data_store, tsnow)
prepare_bin_write_index_v04_insert_futs(item, &data_store, tsnow)
}
}
QueryItem::Accounting(item) => {
@@ -377,8 +375,8 @@ fn inspect_items(
QueryItem::TimeBinSimpleF32V02(_) => {
trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02");
}
QueryItem::BinWriteIndexV03(_) => {
trace_item_execute!("execute {worker_name} BinWriteIndexV03");
QueryItem::BinWriteIndexV04(_) => {
trace_item_execute!("execute {worker_name} BinWriteIndexV04");
}
QueryItem::Accounting(_) => {
trace_item_execute!("execute {worker_name} Accounting {item:?}");
@@ -458,15 +456,15 @@ fn prepare_timebin_v02_insert_futs(
futs
}
fn prepare_bin_write_index_v03_insert_futs(
item: BinWriteIndexV03,
fn prepare_bin_write_index_v04_insert_futs(
item: BinWriteIndexV04,
data_store: &Arc<DataStore>,
tsnow: Instant,
) -> SmallVec<[FutJob; 4]> {
let params = (item.series, item.pbp, item.msp, item.rt, item.lsp, item.binlen);
let params = (item.series, item.pbp, item.msp, item.lsp, item.binlen);
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_bin_write_index_v03.clone(),
data_store.qu_insert_bin_write_index_v04.clone(),
params,
);
let fut = FutJob { fut, ts_net: tsnow };

View File

@@ -554,11 +554,10 @@ pub struct TimeBinSimpleF32V02 {
}
#[derive(Debug, Clone)]
pub struct BinWriteIndexV03 {
pub struct BinWriteIndexV04 {
pub series: i64,
pub pbp: i16,
pub msp: i32,
pub rt: i16,
pub lsp: i32,
pub binlen: i32,
}
@@ -569,7 +568,7 @@ pub enum QueryItem {
Insert(InsertItem),
Msp(MspItem),
TimeBinSimpleF32V02(TimeBinSimpleF32V02),
BinWriteIndexV03(BinWriteIndexV03),
BinWriteIndexV04(BinWriteIndexV04),
Accounting(Accounting),
AccountingRecv(AccountingRecv),
}

View File

@@ -615,17 +615,16 @@ async fn migrate_scylla_data_schema(
let tab = GenTwcsTab::new(
ks,
rett.table_prefix(),
"bin_write_index_v03",
"bin_write_index_v04",
&[
("series", "bigint"),
("pbp", "smallint"),
("msp", "int"),
("rt", "smallint"),
("lsp", "int"),
("binlen", "int"),
],
["series", "pbp", "msp"],
["rt", "lsp", "binlen"],
["lsp", "binlen"],
rett.ttl_binned(),
);
tab.setup(chs, scy).await?;
@@ -707,6 +706,12 @@ async fn migrate_scylla_data_schema(
chs.add_todo(format!("drop table {}.{}", ks, tn));
}
}
{
let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v03");
if has_table(&ks, &tn, scy).await? {
chs.add_todo(format!("drop table {}.{}", ks, tn));
}
}
Ok(())
}

View File

@@ -45,7 +45,7 @@ pub struct DataStore {
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub qu_insert_array_bool: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
pub qu_insert_bin_write_index_v03: Arc<PreparedStatement>,
pub qu_insert_bin_write_index_v04: Arc<PreparedStatement>,
pub qu_account_00: Arc<PreparedStatement>,
pub qu_account_recv_00: Arc<PreparedStatement>,
pub qu_dummy: Arc<PreparedStatement>,
@@ -157,10 +157,10 @@ impl DataStore {
scy
);
let qu_insert_bin_write_index_v03 = prep_qu_ins_c!(
"bin_write_index_v03",
"series, pbp, msp, rt, lsp, binlen",
"?, ?, ?, ?, ?, ?",
let qu_insert_bin_write_index_v04 = prep_qu_ins_c!(
"bin_write_index_v04",
"series, pbp, msp, lsp, binlen",
"?, ?, ?, ?, ?",
rett,
scy
);
@@ -219,7 +219,7 @@ impl DataStore {
qu_insert_array_f64,
qu_insert_array_bool,
qu_insert_binned_scalar_f32_v02,
qu_insert_bin_write_index_v03,
qu_insert_bin_write_index_v04,
qu_account_00,
qu_account_recv_00,
qu_dummy,

View File

@@ -13,7 +13,7 @@ use netpod::Shape;
use netpod::TsNano;
use netpod::ttl::RetentionTime;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::BinWriteIndexV03;
use scywr::iteminsertqueue::BinWriteIndexV04;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use serde::Serialize;
@@ -104,6 +104,28 @@ impl IndexWritten {
}
}
#[derive(Debug, Serialize)]
struct BinnerStateA {
rt: RetentionTime,
binner: BinnedEventsTimeweight<f32>,
write_zero: WriteCntZero,
pbp: PrebinnedPartitioning,
index_written_1: IndexWritten,
index_written_2: Option<IndexWritten>,
discard_front: u8,
}
#[derive(Debug, Serialize)]
struct BinnerStateB {
rt: RetentionTime,
binner: BinnedBinsTimeweight<f32, f32>,
write_zero: WriteCntZero,
pbp: PrebinnedPartitioning,
index_written_1: IndexWritten,
index_written_2: Option<IndexWritten>,
discard_front: u8,
}
#[derive(Debug, Serialize)]
pub struct BinWriter {
chname: String,
@@ -112,22 +134,8 @@ pub struct BinWriter {
scalar_type: ScalarType,
shape: Shape,
evbuf: ContainerEvents<f32>,
binner_1st: Option<(
RetentionTime,
BinnedEventsTimeweight<f32>,
WriteCntZero,
PrebinnedPartitioning,
IndexWritten,
Option<IndexWritten>,
)>,
binner_others: Vec<(
RetentionTime,
BinnedBinsTimeweight<f32, f32>,
WriteCntZero,
PrebinnedPartitioning,
IndexWritten,
Option<IndexWritten>,
)>,
binner_1st: Option<BinnerStateA>,
binner_others: Vec<BinnerStateB>,
trd: bool,
}
@@ -242,7 +250,16 @@ impl BinWriter {
} else {
None
};
binner_1st = Some((rt, binner, write_zero, pbp, IndexWritten::new(), iw2));
let st = BinnerStateA {
rt,
binner,
write_zero,
pbp,
index_written_1: IndexWritten::new(),
index_written_2: iw2,
discard_front: 0,
};
binner_1st = Some(st);
} else {
let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len());
let mut binner = BinnedBinsTimeweight::new(range);
@@ -254,7 +271,16 @@ impl BinWriter {
} else {
None
};
binner_others.push((rt, binner, write_zero, pbp, IndexWritten::new(), iw2));
let st = BinnerStateB {
rt,
binner,
write_zero,
pbp,
index_written_1: IndexWritten::new(),
index_written_2: iw2,
discard_front: 0,
};
binner_others.push(st);
}
}
let ret = Self {
@@ -322,17 +348,11 @@ impl BinWriter {
fn tick_ingest_and_handle(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let buf = &self.evbuf;
if let Some(ee) = self.binner_1st.as_mut() {
let rt = ee.0.clone();
let write_zero = ee.2.clone();
let binner = &mut ee.1;
let pbp = ee.3.clone();
let index_written = &mut ee.4;
let iw2 = &mut ee.5;
if let Some(st) = self.binner_1st.as_mut() {
// TODO avoid boxing
let bufbox = Box::new(buf);
use items_0::timebin::IngestReport;
let consumed_evs = match binner.ingest(&bufbox)? {
let consumed_evs = match st.binner.ingest(&bufbox)? {
IngestReport::ConsumedAll => {
let n = bufbox.len();
self.evbuf.clear();
@@ -343,28 +363,28 @@ impl BinWriter {
n
}
};
let bins = binner.output();
let bins = st.binner.output();
if bins.len() > 0 {
trace_bin!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(
self.trd,
true,
self.sid,
rt,
st.rt.clone(),
&bins,
write_zero,
index_written,
iw2,
pbp,
st.write_zero.clone(),
&mut st.index_written_1,
&mut st.index_written_2,
st.pbp.clone(),
&mut st.discard_front,
iqdqs,
)?;
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
let (rt, binner, write_zero, pbp, index_written, iw2) = &mut self.binner_others[i];
let write_zero = write_zero.clone();
binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = binner.output()?;
let st = &mut self.binner_others[i];
st.binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = st.binner.output()?;
match bb {
Some(bb) => {
if bb.len() > 0 {
@@ -374,12 +394,13 @@ impl BinWriter {
self.trd,
false,
self.sid,
rt.clone(),
st.rt.clone(),
&bb2,
write_zero,
index_written,
iw2,
pbp.clone(),
st.write_zero.clone(),
&mut st.index_written_1,
&mut st.index_written_2,
st.pbp.clone(),
&mut st.discard_front,
iqdqs,
)?;
} else {
@@ -419,6 +440,7 @@ impl BinWriter {
iw1: &mut IndexWritten,
iw2: &mut Option<IndexWritten>,
pbp: PrebinnedPartitioning,
discard_front: &mut u8,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let selfname = "handle_output_ready";
@@ -446,80 +468,81 @@ impl BinWriter {
let e = Error::UnexpectedBinLen(bin_len, pbp);
return Err(e);
}
{
let (msp, lsp) = pbp.msp_lsp(ts1.to_ts_ms());
let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 {
series,
binlen: bin_len.ms() as i32,
msp: msp as i64,
off: lsp as i32,
cnt: cnt as i64,
min,
max,
avg,
dev: f32::NAN,
lst,
});
if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
if *discard_front < 1 {
*discard_front += 1;
} else {
{
let (msp, lsp) = pbp.msp_lsp(ts1.to_ts_ms());
let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 {
series,
binlen: bin_len.ms() as i32,
msp: msp as i64,
off: lsp as i32,
cnt: cnt as i64,
min,
max,
avg,
dev: f32::NAN,
lst,
});
if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
}
let qu = iqdqs.deque(rt.clone());
qu.push_back(item);
}
let qu = iqdqs.deque(rt.clone());
qu.push_back(item);
}
if pbp.uses_index_min10() {
let pbp_ix = PrebinnedPartitioning::Min10;
let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms());
debug_bin!(
trd,
"handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}",
series,
pbp_ix,
pbp,
rt,
msp,
lsp
);
let iw = iw2.as_mut().unwrap();
if iw.should_write(msp, lsp) {
iw.mark_written(msp, lsp);
let item = BinWriteIndexV03 {
series: series.id() as i64,
pbp: pbp_ix.db_ix() as i16,
msp: msp as i32,
rt: rt.to_index_db_i32() as i16,
lsp: lsp as i32,
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV03(item);
iqdqs.deque(rt.clone()).push_back(item);
if pbp.uses_index_min10() {
let pbp_ix = PrebinnedPartitioning::Min10;
let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms());
debug_bin!(
trd,
"handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}",
series,
pbp_ix,
pbp,
rt,
msp,
lsp
);
let iw = iw2.as_mut().unwrap();
if iw.should_write(msp, lsp) {
iw.mark_written(msp, lsp);
let item = BinWriteIndexV04 {
series: series.id() as i64,
pbp: pbp_ix.db_ix() as i16,
msp: msp as i32,
lsp: lsp as i32,
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV04(item);
iqdqs.deque(rt.clone()).push_back(item);
}
}
}
if true {
let pbp_ix = PrebinnedPartitioning::Day1;
let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms());
debug_bin!(
trd,
"handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}",
series,
pbp_ix,
pbp,
rt,
msp,
lsp
);
// let iw = iw1;
if iw1.should_write(msp, lsp) {
iw1.mark_written(msp, lsp);
let item = BinWriteIndexV03 {
series: series.id() as i64,
pbp: pbp_ix.db_ix() as i16,
msp: msp as i32,
rt: rt.to_index_db_i32() as i16,
lsp: lsp as i32,
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV03(item);
iqdqs.deque(rt.clone()).push_back(item);
if true {
let pbp_ix = PrebinnedPartitioning::Day1;
let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms());
debug_bin!(
trd,
"handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}",
series,
pbp_ix,
pbp,
rt,
msp,
lsp
);
if iw1.should_write(msp, lsp) {
iw1.mark_written(msp, lsp);
let item = BinWriteIndexV04 {
series: series.id() as i64,
pbp: pbp_ix.db_ix() as i16,
msp: msp as i32,
lsp: lsp as i32,
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV04(item);
iqdqs.deque(rt.clone()).push_back(item);
}
}
}
}

View File

@@ -137,6 +137,29 @@ mod Metrics {
}
}
// mod Metrics {
// type StructName = IocFinderMetrics;
// enum counters {
// dbsearcher_batch_recv,
// dbsearcher_item_recv,
// dbsearcher_select_res_0,
// dbsearcher_select_error_len_mismatch,
// dbsearcher_batch_send,
// dbsearcher_item_send,
// ca_udp_error,
// ca_udp_warn,
// ca_udp_unaccounted_data,
// ca_udp_batch_created,
// ca_udp_io_error,
// ca_udp_io_empty,
// ca_udp_io_recv,
// ca_udp_first_msg_not_version,
// ca_udp_recv_result,
// ca_udp_recv_timeout,
// ca_udp_logic_error,
// }
// }
mod Metrics {
type StructName = DaemonMetrics;
mod Compose {

View File

@@ -2,15 +2,9 @@ pub mod mett;
pub use rand_xoshiro;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
const US: u64 = 1000;
const MS: u64 = US * 1000;
const SEC: u64 = MS * 1000;
pub type EMA = Ema32;
#[derive(Clone, Debug)]
@@ -283,126 +277,6 @@ impl XorShift32 {
}
}
stats_proc::stats_struct!((
stats_struct(
name(DaemonStats),
prefix(daemon),
counters(asdasd,),
values(
channel_unknown_address,
channel_search_pending,
channel_with_address,
channel_no_address,
connset_health_lat_ema,
// iqtx_len_st_rf1,
iqtx_len_st_rf3,
iqtx_len_mt_rf3,
iqtx_len_lt_rf3,
iqtx_len_lt_rf3_lat5,
),
),
agg(name(DaemonStatsAgg), parent(DaemonStats)),
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),
stats_struct(
name(CaProtoStats),
prefix(ca_proto),
counters(
// tcp_recv_count,
// tcp_recv_bytes,
protocol_issue,
payload_std_too_large,
payload_ext_but_small,
payload_ext_very_large,
out_msg_placed,
out_bytes,
),
histolog2s(payload_size, data_count, outbuf_len,),
),
stats_struct(
name(SeriesByChannelStats),
prefix(seriesbychannel),
counters(res_tx_fail, res_tx_timeout, recv_batch, recv_items,),
histolog2s(commit_duration_ms),
),
stats_struct(
name(InsertWorkerStats),
prefix(insert_worker),
counters(
logic_error,
item_recv,
inserted_values,
inserted_connection_status,
inserted_channel_status,
fraction_drop,
inserted_mute,
inserted_interval,
inserted_channel_info,
inserted_binned,
db_overload,
db_timeout,
db_unavailable,
db_error,
query_error,
inserts_msp,
inserts_msp_grid,
inserts_value,
ratelimit_drop,
worker_start,
worker_finish,
),
histolog2s(item_lat_net_worker, item_lat_net_store,),
),
stats_struct(
name(IocFinderStats),
prefix(ioc_finder),
counters(
dbsearcher_batch_recv,
dbsearcher_item_recv,
dbsearcher_select_res_0,
dbsearcher_select_error_len_mismatch,
dbsearcher_batch_send,
dbsearcher_item_send,
ca_udp_error,
ca_udp_warn,
ca_udp_unaccounted_data,
ca_udp_batch_created,
ca_udp_io_error,
ca_udp_io_empty,
ca_udp_io_recv,
ca_udp_first_msg_not_version,
ca_udp_recv_result,
ca_udp_recv_timeout,
ca_udp_logic_error,
),
values(db_lookup_workers,)
),
stats_struct(
name(SeriesWriterEstablishStats),
prefix(wrest),
counters(job_recv, result_send_fail,),
),
));
stats_proc::stats_struct!((
stats_struct(name(TestStats0), counters(count0,), values(val0),),
diff(name(TestStats0Diff), input(TestStats0)),
agg(name(TestStats0Agg), parent(TestStats0)),
diff(name(TestStats0AggDiff), input(TestStats0Agg)),
));
#[test]
fn test0_diff() {
let stats_a = TestStats0::new();
stats_a.count0().inc();
stats_a.val0().set(43);
let stats_b = stats_a.snapshot();
stats_b.count0().inc();
stats_b.count0().inc();
stats_b.count0().inc();
let diff = TestStats0Diff::diff_from(&stats_a, &stats_b);
assert_eq!(diff.count0.load(), 3);
}
pub fn xoshiro_from_time() -> rand_xoshiro::Xoshiro128PlusPlus {
use rand_xoshiro::rand_core::SeedableRng;
use std::time::SystemTime;