diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 29d0267..b558a63 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.0-aa.2" +version = "0.3.0-aa.3" authors = ["Dominik Werder "] edition = "2024" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 15c866c..8fcd0cd 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -162,6 +162,10 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> { let opstr = if do_change { "change" } else { "check" }; info!("start scylla schema {}", opstr); + info!("{:?}", opts.scylla_config_st()); + info!("{:?}", opts.scylla_config_mt()); + info!("{:?}", opts.scylla_config_lt()); + info!("{:?}", opts.scylla_config_st_rf1()); scywr::schema::migrate_scylla_data_schema_all_rt( [ &opts.scylla_config_st(), diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 20b9e17..8644f7e 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -23,9 +23,6 @@ use scywr::config::ScyllaIngestConfig; use scywr::insertqueues::InsertQueuesRx; use scywr::insertqueues::InsertQueuesTx; use scywr::insertworker::InsertWorkerOpts; -use stats::DaemonStats; -use stats::InsertWorkerStats; -use stats::SeriesByChannelStats; use stats::rand_xoshiro::rand_core::RngCore; use std::sync::Arc; use std::sync::atomic; @@ -68,9 +65,6 @@ pub struct Daemon { count_assigned: usize, last_status_print: SystemTime, insert_workers_jhs: Vec>>, - stats: Arc, - insert_worker_stats: Arc, - series_by_channel_stats: Arc, shutting_down: bool, connset_ctrl: CaConnSetCtrl, connset_status_last: Instant, @@ -90,15 +84,11 @@ impl Daemon { pub async fn new(opts: DaemonOpts, ingest_opts: CaIngestOpts) -> Result { let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32); - let series_by_channel_stats = Arc::new(SeriesByChannelStats::new()); - let insert_worker_stats = Arc::new(InsertWorkerStats::new()); - // TODO keep join handles and await later - let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers::< - dbpg::seriesbychannel::SalterRandom, - >(2, &opts.pgconf, series_by_channel_stats.clone()) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let (channel_info_query_tx, jhs, jh) = + dbpg::seriesbychannel::start_lookup_workers::(2, &opts.pgconf) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; // TODO so far a dummy let (series_conf_by_id_tx, _series_conf_by_id_rx) = async_channel::bounded(16); @@ -207,7 +197,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.st_rf1_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), insert_worker_output_tx.clone(), ) .await @@ -218,7 +207,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.st_rf3_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), insert_worker_output_tx.clone(), ) .await @@ -229,7 +217,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.mt_rf3_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), insert_worker_output_tx.clone(), ) .await @@ -240,7 +227,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.lt_rf3_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), insert_worker_output_tx.clone(), ) .await @@ -251,7 +237,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.lt_rf3_lat5_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), insert_worker_output_tx.clone(), ) .await @@ -266,7 +251,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.st_rf1_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, insert_worker_output_tx.clone(), @@ -283,7 +267,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.st_rf3_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, insert_worker_output_tx.clone(), @@ -300,7 +283,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), iqrx.mt_rf3_rx, insert_worker_opts.clone(), - insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, insert_worker_output_tx.clone(), @@ -319,7 +301,6 @@ impl Daemon { ingest_opts.insert_worker_concurrency(), lt_rx_combined, insert_worker_opts.clone(), - insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ignore_writes, insert_worker_output_tx.clone(), @@ -328,7 +309,6 @@ impl Daemon { .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); }; - let stats = Arc::new(DaemonStats::new()); #[cfg(feature = "bsread")] if let Some(bsaddr) = &opts.test_bsread_addr { @@ -402,9 +382,6 @@ impl Daemon { count_assigned: 0, last_status_print: SystemTime::now(), insert_workers_jhs: insert_worker_jhs, - stats, - insert_worker_stats, - series_by_channel_stats, shutting_down: false, connset_ctrl: conn_set_ctrl, connset_status_last: Instant::now(), @@ -420,10 +397,6 @@ impl Daemon { Ok(ret) } - fn stats(&self) -> &Arc { - &self.stats - } - async fn check_health(&mut self, ts1: Instant) -> Result<(), Error> { self.check_health_connset(ts1)?; Ok(()) @@ -862,7 +835,6 @@ impl Daemon { pub async fn spawn_metrics(&mut self) -> Result<(), Error> { let tx = self.tx.clone(); - let daemon_stats = self.stats().clone(); let connset_cmd_tx = self.connset_ctrl.sender().clone(); let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); let rres = RoutesResources::new( @@ -880,13 +852,7 @@ impl Daemon { ); let rres = Arc::new(rres); let metrics_jh = { - let stats_set = StatsSet::new( - daemon_stats, - self.insert_worker_stats.clone(), - self.series_by_channel_stats.clone(), - self.connset_ctrl.ioc_finder_stats().clone(), - self.opts.insert_frac.clone(), - ); + let stats_set = StatsSet::new(self.opts.insert_frac.clone()); let fut = netfetch::metrics::metrics_service( self.ingest_opts.api_bind(), dcom, diff --git a/daqingest/src/tools/catools.rs b/daqingest/src/tools/catools.rs index 73458a9..ddf5290 100644 --- a/daqingest/src/tools/catools.rs +++ b/daqingest/src/tools/catools.rs @@ -1,9 +1,5 @@ use crate::opts::CaFind; -use err::thiserror; -use err::ThisError; use futures_util::StreamExt; -use stats::IocFinderStats; -use std::sync::Arc; use std::time::Duration; autoerr::create_error_v1!( @@ -26,7 +22,6 @@ pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> { let batch_run_max = Duration::from_millis(1200); let in_flight_max = 1; let batch_size = 1; - let stats = Arc::new(IocFinderStats::new()); channels_input_tx.send(cmd.channel).await.unwrap(); let stream = netfetch::ca::findioc::FindIocStream::new( channels_input_rx, @@ -35,7 +30,6 @@ pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> { batch_run_max, in_flight_max, batch_size, - stats, ); let deadline = taskrun::tokio::time::sleep(Duration::from_millis(2000)); let mut stream = Box::pin(stream.take_until(deadline)); diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 26b0f67..65071b3 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -15,7 +15,6 @@ use netpod::SeriesKind; use netpod::Shape; use serde::Serialize; use series::SeriesId; -use stats::SeriesByChannelStats; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -144,16 +143,11 @@ struct Worker { qu_select: PgStatement, qu_insert: PgStatement, batch_rx: Receiver>, - stats: Arc, pg_client_jh: JoinHandle>, } impl Worker { - async fn new( - db: &Database, - batch_rx: Receiver>, - stats: Arc, - ) -> Result { + async fn new(db: &Database, batch_rx: Receiver>) -> Result { use tokio_postgres::types::Type; debug!("Worker make_pg_client"); let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; @@ -206,7 +200,6 @@ impl Worker { qu_select, qu_insert, batch_rx, - stats, pg_client_jh, }; Ok(ret) @@ -215,8 +208,9 @@ impl Worker { async fn work(&mut self) -> Result<(), Error> { let batch_rx = self.batch_rx.clone(); while let Ok(batch) = batch_rx.recv().await { - self.stats.recv_batch().inc(); - self.stats.recv_items().add(batch.len() as _); + // TODO + // stats.recv_batch().inc(); + // stats.recv_items().add(batch.len() as _); for x in &batch { trace3!( "search for {} {} {:?} {:?}", @@ -233,7 +227,8 @@ impl Worker { match self.pg.execute("commit", &[]).await { Ok(n) => { let dt = ts1.elapsed(); - self.stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32); + // TODO + // stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32); if dt > Duration::from_millis(40) { debug!("commit {} {:.0} ms", n, dt.as_secs_f32()); } @@ -313,7 +308,8 @@ impl Worker { match e.0.tx.make_send(item).await { Ok(()) => {} Err(_) => { - self.stats.res_tx_fail.inc(); + // TODO + // stats.res_tx_fail.inc(); } }; } @@ -603,7 +599,6 @@ pub trait HashSalter { pub async fn start_lookup_workers( worker_count: usize, db: &Database, - stats: Arc, ) -> Result< ( Sender, @@ -619,7 +614,7 @@ pub async fn start_lookup_workers( let (batch_rx, bjh) = batchtools::batcher::batch(inp_cap, timeout, batch_out_cap, query_rx); let mut jhs = Vec::new(); for _ in 0..worker_count { - let mut worker = Worker::new(db, batch_rx.clone(), stats.clone()).await?; + let mut worker = Worker::new(db, batch_rx.clone()).await?; let jh = tokio::task::spawn(async move { worker.work::().await }); jhs.push(jh); } @@ -727,7 +722,6 @@ fn test_series_by_channel_01() { let channel = "chn-test-00"; let channel_01 = "chn-test-01"; let channel_02 = "chn-test-02"; - let series_by_channel_stats = Arc::new(SeriesByChannelStats::new()); let pgconf = test_db_conf(); if false { psql_play(&pgconf).await?; @@ -767,8 +761,7 @@ fn test_series_by_channel_01() { } // TODO keep join handles and await later let (channel_info_query_tx, _jhs, _jh) = - dbpg::seriesbychannel::start_lookup_workers::(1, &pgconf, series_by_channel_stats.clone()) - .await?; + dbpg::seriesbychannel::start_lookup_workers::(1, &pgconf).await?; let mut rxs = Vec::new(); let rx = { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index cb2e8bd..5c6e1c4 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2337,7 +2337,7 @@ impl CaConn { Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); - // binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?; + binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?; { let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?; crst.status_emit_count += wres.nstatus() as u64; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 20bf1ef..8630ca8 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -44,7 +44,6 @@ use statemap::ConnectionState; use statemap::ConnectionStateValue; use statemap::WithStatusSeriesIdState; use statemap::WithStatusSeriesIdStateInner; -use stats::IocFinderStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::fmt; @@ -247,7 +246,6 @@ pub enum CaConnSetItem { pub struct CaConnSetCtrl { tx: Sender, rx: Receiver, - ioc_finder_stats: Arc, jh: JoinHandle>, } @@ -305,10 +303,6 @@ impl CaConnSetCtrl { self.jh.await??; Ok(()) } - - pub fn ioc_finder_stats(&self) -> &Arc { - &self.ioc_finder_stats - } } #[derive(Debug)] @@ -432,14 +426,8 @@ impl CaConnSet { let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200); let (connset_out_tx, connset_out_rx) = async_channel::bounded(200); let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(400); - let ioc_finder_stats = Arc::new(IocFinderStats::new()); - let (find_ioc_query_tx, ioc_finder_jh) = super::finder::start_finder( - find_ioc_res_tx.clone(), - backend.clone(), - ingest_opts, - ioc_finder_stats.clone(), - ) - .unwrap(); + let (find_ioc_query_tx, ioc_finder_jh) = + super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), ingest_opts).unwrap(); let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400); let connset = Self { ticker: Self::new_self_ticker(), @@ -485,7 +473,6 @@ impl CaConnSet { CaConnSetCtrl { tx: connset_inp_tx, rx: connset_out_rx, - ioc_finder_stats, jh, } } diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 826ba8b..9426348 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -12,9 +12,7 @@ use dbpg::iocindex::IocSearchIndexWorker; use dbpg::postgres::Row as PgRow; use log::*; use netpod::Database; -use stats::IocFinderStats; use std::collections::VecDeque; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; use taskrun::tokio; @@ -75,10 +73,9 @@ pub fn start_finder( tx: Sender>, backend: String, opts: CaIngestOpts, - stats: Arc, ) -> Result<(Sender, JoinHandle>), Error> { let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); - let jh = taskrun::spawn(finder_full(qrx, tx, backend, opts, stats)); + let jh = taskrun::spawn(finder_full(qrx, tx, backend, opts)); Ok((qtx, jh)) } @@ -87,17 +84,10 @@ async fn finder_full( tx: Sender>, backend: String, opts: CaIngestOpts, - stats: Arc, ) -> Result<(), Error> { let (tx1, rx1) = async_channel::bounded(20); - let jh1 = taskrun::spawn(finder_worker( - qrx, - tx1, - backend, - opts.postgresql_config().clone(), - stats.clone(), - )); - let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone(), stats)); + let jh1 = taskrun::spawn(finder_worker(qrx, tx1, backend, opts.postgresql_config().clone())); + let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone())); jh1.await??; trace!("finder::finder_full awaited A"); jh2.await??; @@ -111,7 +101,6 @@ async fn finder_worker( tx: Sender>, backend: String, db: Database, - stats: Arc, ) -> Result<(), Error> { // TODO do something with join handle let (batch_rx, jh_batch) = @@ -123,7 +112,6 @@ async fn finder_worker( tx.clone(), backend.clone(), db.clone(), - stats.clone(), )); jhs.push(jh); } @@ -141,7 +129,6 @@ async fn finder_worker_single( tx: Sender>, backend: String, db: Database, - stats: Arc, ) -> Result<(), Error> { debug!("finder_worker_single make_pg_client"); let (pg, jh) = make_pg_client(&db).await?; @@ -159,8 +146,9 @@ async fn finder_worker_single( for e in batch.iter().filter(|x| series::dbg::dbg_chn(x.name())) { info!("searching database for {:?}", e); } - stats.dbsearcher_batch_recv().inc(); - stats.dbsearcher_item_recv().add(batch.len() as _); + // TODO + // stats.dbsearcher_batch_recv().inc(); + // stats.dbsearcher_item_recv().add(batch.len() as _); let ts1 = Instant::now(); let (batch, pass_through) = batch.into_iter().fold((Vec::new(), Vec::new()), |(mut a, mut b), x| { if x.use_cache() { @@ -189,9 +177,9 @@ async fn finder_worker_single( } match qres { Ok(rows) => { - stats.dbsearcher_select_res_0().add(rows.len() as _); + // stats.dbsearcher_select_res_0().add(rows.len() as _); if rows.len() != batch.len() { - stats.dbsearcher_select_error_len_mismatch().inc(); + // stats.dbsearcher_select_error_len_mismatch().inc(); error!("query result len {} batch len {}", rows.len(), batch.len()); tokio::time::sleep(Duration::from_millis(1000)).await; continue; @@ -215,8 +203,9 @@ async fn finder_worker_single( let items_len = items.len(); match tx.send(items).await { Ok(_) => { - stats.dbsearcher_batch_send().inc(); - stats.dbsearcher_item_send().add(items_len as _); + // TODO + // stats.dbsearcher_batch_send().inc(); + // stats.dbsearcher_item_send().add(items_len as _); } Err(e) => { error!("finder sees: {}", e); @@ -243,10 +232,9 @@ async fn finder_network_if_not_found( rx: Receiver>, tx: Sender>, opts: CaIngestOpts, - stats: Arc, ) -> Result<(), Error> { let self_name = "finder_network_if_not_found"; - let (net_tx, net_rx, jh_ca_search) = ca_search_workers_start(&opts, stats.clone()).await?; + let (net_tx, net_rx, jh_ca_search) = ca_search_workers_start(&opts).await?; let jh2 = taskrun::spawn(process_net_result(net_rx, tx.clone(), opts.clone())); 'outer: while let Ok(item) = rx.recv().await { let mut res = VecDeque::new(); diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index dabb7cc..9fd1c50 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -9,13 +9,11 @@ use log::*; use proto::CaMsg; use proto::CaMsgTy; use proto::HeadInfo; -use stats::IocFinderStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::Ipv4Addr; use std::net::SocketAddrV4; use std::pin::Pin; -use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::task::Context; @@ -120,7 +118,6 @@ pub struct FindIocStream { thr_msg_1: ThrottleTrace, #[allow(unused)] thr_msg_2: ThrottleTrace, - stats: Arc, } impl FindIocStream { @@ -131,7 +128,6 @@ impl FindIocStream { batch_run_max: Duration, in_flight_max: usize, batch_size: usize, - stats: Arc, ) -> Self { let sock = unsafe { Self::create_socket() }.unwrap(); let afd = AsyncFd::new(sock.0).unwrap(); @@ -159,7 +155,6 @@ impl FindIocStream { thr_msg_0: ThrottleTrace::new(Duration::from_millis(1000)), thr_msg_1: ThrottleTrace::new(Duration::from_millis(1000)), thr_msg_2: ThrottleTrace::new(Duration::from_millis(1000)), - stats, } } @@ -284,10 +279,7 @@ impl FindIocStream { Poll::Ready(Ok(())) } - unsafe fn try_read( - sock: i32, - stats: &IocFinderStats, - ) -> Poll), Error>> { + unsafe fn try_read(sock: i32) -> Poll), Error>> { let tsnow = Instant::now(); let mut saddr_mem = [0u8; std::mem::size_of::()]; let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; @@ -310,14 +302,14 @@ impl FindIocStream { return Poll::Ready(Err(Error::ReadFailure)); } } else if ec < 0 { - stats.ca_udp_io_error().inc(); + // stats.ca_udp_io_error().inc(); error!("unexpected received {ec}"); Poll::Ready(Err(Error::ReadFailure)) } else if ec == 0 { - stats.ca_udp_io_empty().inc(); + // stats.ca_udp_io_empty().inc(); Poll::Ready(Err(Error::ReadEmpty)) } else { - stats.ca_udp_io_recv().inc(); + // stats.ca_udp_io_recv().inc(); let saddr2: libc::sockaddr_in = unsafe { std::mem::transmute_copy(&saddr_mem) }; let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); let src_port = u16::from_be(saddr2.sin_port); @@ -366,15 +358,15 @@ impl FindIocStream { accounted += 16 + hi.payload_len(); } if accounted != ec as u32 { - stats.ca_udp_unaccounted_data().inc(); + // stats.ca_udp_unaccounted_data().inc(); debug!("unaccounted data ec {} accounted {}", ec, accounted); } if msgs.len() < 1 { - stats.ca_udp_warn().inc(); + // stats.ca_udp_warn().inc(); debug!("received answer without messages"); } if msgs.len() == 1 { - stats.ca_udp_warn().inc(); + // stats.ca_udp_warn().inc(); debug!("received answer with single message: {msgs:?}"); } let mut good = true; @@ -384,7 +376,7 @@ impl FindIocStream { good = false; } } else { - stats.ca_udp_first_msg_not_version().inc(); + // stats.ca_udp_first_msg_not_version().inc(); } // trace2!("recv {:?} {:?}", src_addr, msgs); let mut res = Vec::new(); @@ -398,7 +390,7 @@ impl FindIocStream { res.push((SearchId(k.id), addr)); } _ => { - stats.ca_udp_error().inc(); + // stats.ca_udp_error().inc(); warn!("try_read: unknown message received {:?}", msg.ty); } } @@ -449,7 +441,7 @@ impl FindIocStream { }; self.in_flight.insert(bid.clone(), batch); self.batch_send_queue.push_back(bid); - self.stats.ca_udp_batch_created().inc(); + // stats.ca_udp_batch_created().inc(); } fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) { @@ -477,11 +469,11 @@ impl FindIocStream { dt, }; // trace!("udp search response {res:?}"); - self.stats.ca_udp_recv_result().inc(); + // stats.ca_udp_recv_result().inc(); self.out_queue.push_back(res); } None => { - self.stats.ca_udp_logic_error().inc(); + // stats.ca_udp_logic_error().inc(); error!( "logic error batch sids / channels lens: {} vs {}", batch.sids.len(), @@ -537,7 +529,7 @@ impl FindIocStream { sids.push(sid.clone()); chns.push(batch.channels[i2].clone()); dts.push(dt); - self.stats.ca_udp_recv_timeout().inc(); + // stats.ca_udp_recv_timeout().inc(); } } bids.push(bid.clone()); @@ -692,7 +684,7 @@ impl Stream for FindIocStream { break match self.afd.poll_read_ready(cx) { Ready(Ok(mut g)) => { // debug!("BLOCK AA"); - match unsafe { Self::try_read(self.sock.0, &self.stats) } { + match unsafe { Self::try_read(self.sock.0) } { Ready(Ok((src, res))) => { self.handle_result(src, res); if self.ready_for_end_of_stream() { diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 092a7ab..e8c7a00 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -4,12 +4,10 @@ use async_channel::Receiver; use async_channel::Sender; use futures_util::StreamExt; use log::*; -use stats::IocFinderStats; use std::collections::VecDeque; use std::net::IpAddr; use std::net::SocketAddr; use std::net::SocketAddrV4; -use std::sync::Arc; use std::time::Duration; use taskrun::tokio; use tokio::task::JoinHandle; @@ -59,7 +57,6 @@ async fn resolve_address(addr_str: &str) -> Result { pub async fn ca_search_workers_start( opts: &CaIngestOpts, - stats: Arc, ) -> Result< ( Sender, @@ -72,7 +69,7 @@ pub async fn ca_search_workers_start( let batch_run_max = Duration::from_millis(800); let (inp_tx, inp_rx) = async_channel::bounded(256); let (out_tx, out_rx) = async_channel::bounded(256); - let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16, stats); + let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16); let jh = taskrun::spawn(finder_run(finder, out_tx)); Ok((inp_tx, out_rx, jh)) } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 6bb498f..f245ba5 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -33,11 +33,6 @@ use scywr::iteminsertqueue::QueryItem; use serde::Deserialize; use serde::Serialize; use serde_json::json; -use stats::CaProtoStats; -use stats::DaemonStats; -use stats::InsertWorkerStats; -use stats::IocFinderStats; -use stats::SeriesByChannelStats; use std::collections::BTreeMap; use std::collections::HashMap; use std::net::SocketAddr; @@ -128,28 +123,12 @@ impl IntoResponse for CustomErrorResponse { #[derive(Clone)] pub struct StatsSet { - daemon: Arc, - insert_worker_stats: Arc, - series_by_channel_stats: Arc, - ioc_finder_stats: Arc, insert_frac: Arc, } impl StatsSet { - pub fn new( - daemon: Arc, - insert_worker_stats: Arc, - series_by_channel_stats: Arc, - ioc_finder_stats: Arc, - insert_frac: Arc, - ) -> Self { - Self { - daemon, - insert_worker_stats, - series_by_channel_stats, - ioc_finder_stats, - insert_frac, - } + pub fn new(insert_frac: Arc) -> Self { + Self { insert_frac } } } @@ -340,23 +319,12 @@ impl DaemonComm { fn metricbeat(stats_set: &StatsSet) -> axum::Json { let mut map = serde_json::Map::new(); - map.insert("daemon".to_string(), stats_set.daemon.json()); - map.insert("insert_worker_stats".to_string(), stats_set.insert_worker_stats.json()); + // map.insert("insert_worker_stats".to_string(), stats_set.insert_worker_stats.json()); let mut ret = serde_json::Map::new(); ret.insert("daqingest".to_string(), serde_json::Value::Object(map)); axum::Json(serde_json::Value::Object(ret)) } -fn metrics(stats_set: &StatsSet) -> String { - let ss = [ - stats_set.daemon.prometheus(), - stats_set.insert_worker_stats.prometheus(), - stats_set.series_by_channel_stats.prometheus(), - stats_set.ioc_finder_stats.prometheus(), - ]; - ss.join("") -} - pub struct RoutesResources { backend: String, worker_tx: Sender, @@ -427,10 +395,8 @@ fn make_routes( let dcom = dcom.clone(); let stats_set = stats_set.clone(); || async move { - let prom2 = metrics2(dcom).await.unwrap_or(String::new()); - let mut s = metrics(&stats_set); - s.push_str(&prom2); - s + let prom = metrics2(dcom).await.unwrap_or(String::new()); + prom } }), ), diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 8fdc7f1..f7e0631 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,7 +1,7 @@ use crate::config::ScyllaIngestConfig; use crate::iteminsertqueue::Accounting; use crate::iteminsertqueue::AccountingRecv; -use crate::iteminsertqueue::BinWriteIndexV03; +use crate::iteminsertqueue::BinWriteIndexV04; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::MspItem; @@ -20,7 +20,6 @@ use log; use netpod::ttl::RetentionTime; use smallvec::SmallVec; use smallvec::smallvec; -use stats::InsertWorkerStats; use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; @@ -53,22 +52,25 @@ autoerr::create_error_v1!( }, ); -fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::InsertFutError) { +fn stats_inc_for_err(err: &crate::iteminsertqueue::InsertFutError) { use crate::iteminsertqueue::InsertFutError; match err { InsertFutError::Execution(e) => match e { scylla::errors::ExecutionError::RequestTimeout(_) => { - stats.db_timeout().inc(); + // TODO + // stats.db_timeout().inc(); } _ => { if true { warn!("db error {}", err); } - stats.db_error().inc(); + // TODO + // stats.db_error().inc(); } }, InsertFutError::NoFuture => { - stats.logic_error().inc(); + // TODO + // stats.logic_error().inc(); } } } @@ -108,7 +110,6 @@ pub async fn spawn_scylla_insert_workers( insert_worker_concurrency: usize, item_inp: Receiver>, insert_worker_opts: Arc, - store_stats: Arc, use_rate_limit_queue: bool, ignore_writes: bool, tx: Sender, @@ -133,7 +134,6 @@ pub async fn spawn_scylla_insert_workers( insert_worker_opts.clone(), Some(data_store), ignore_writes, - store_stats.clone(), tx.clone(), )); jhs.push(jh); @@ -146,7 +146,6 @@ pub async fn spawn_scylla_insert_workers_dummy( insert_worker_concurrency: usize, item_inp: Receiver>, insert_worker_opts: Arc, - store_stats: Arc, tx: Sender, ) -> Result>>, Error> { let mut jhs = Vec::new(); @@ -159,7 +158,6 @@ pub async fn spawn_scylla_insert_workers_dummy( insert_worker_opts.clone(), data_store, true, - store_stats.clone(), tx.clone(), )); jhs.push(jh); @@ -214,7 +212,6 @@ async fn worker_streamed( insert_worker_opts: Arc, data_store: Option>, ignore_writes: bool, - stats: Arc, tx: Sender, ) -> Result<(), Error> { debug_setup!("worker_streamed begin"); @@ -231,7 +228,7 @@ async fn worker_streamed( .map_or_else(|| format!("dummy"), |x| x.rett.debug_tag().to_string()); let stream = inspect_items(stream, worker_name.clone()); if let Some(data_store) = data_store { - let stream = transform_to_db_futures(stream, data_store, ignore_writes, stats.clone()); + let stream = transform_to_db_futures(stream, data_store, ignore_writes); let stream = stream .map(|x| futures_util::stream::iter(x)) .flatten_unordered(Some(1)) @@ -254,7 +251,7 @@ async fn worker_streamed( } Err(e) => { mett.job_err().inc(); - stats_inc_for_err(&stats, &e); + stats_inc_for_err(&e); } } if mett_emit_last + metrics_ivl <= tsnow { @@ -276,7 +273,8 @@ async fn worker_streamed( drop(item); } }; - stats.worker_finish().inc(); + // TODO + // stats.worker_finish().inc(); insert_worker_opts .insert_workers_running .fetch_sub(1, atomic::Ordering::AcqRel); @@ -293,7 +291,6 @@ fn transform_to_db_futures( item_inp: S, data_store: Arc, ignore_writes: bool, - stats: Arc, ) -> impl Stream> where S: Stream>, @@ -302,7 +299,8 @@ where // TODO possible without box? // let item_inp = Box::pin(item_inp); item_inp.map(move |batch| { - stats.item_recv.inc(); + // TODO + // stats.item_recv.inc(); trace_transform!("transform_to_db_futures have batch len {}", batch.len()); let tsnow = Instant::now(); let mut res = Vec::with_capacity(32); @@ -329,11 +327,11 @@ where prepare_timebin_v02_insert_futs(item, &data_store, tsnow) } } - QueryItem::BinWriteIndexV03(item) => { + QueryItem::BinWriteIndexV04(item) => { if ignore_writes { SmallVec::new() } else { - prepare_bin_write_index_v03_insert_futs(item, &data_store, tsnow) + prepare_bin_write_index_v04_insert_futs(item, &data_store, tsnow) } } QueryItem::Accounting(item) => { @@ -377,8 +375,8 @@ fn inspect_items( QueryItem::TimeBinSimpleF32V02(_) => { trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02"); } - QueryItem::BinWriteIndexV03(_) => { - trace_item_execute!("execute {worker_name} BinWriteIndexV03"); + QueryItem::BinWriteIndexV04(_) => { + trace_item_execute!("execute {worker_name} BinWriteIndexV04"); } QueryItem::Accounting(_) => { trace_item_execute!("execute {worker_name} Accounting {item:?}"); @@ -458,15 +456,15 @@ fn prepare_timebin_v02_insert_futs( futs } -fn prepare_bin_write_index_v03_insert_futs( - item: BinWriteIndexV03, +fn prepare_bin_write_index_v04_insert_futs( + item: BinWriteIndexV04, data_store: &Arc, tsnow: Instant, ) -> SmallVec<[FutJob; 4]> { - let params = (item.series, item.pbp, item.msp, item.rt, item.lsp, item.binlen); + let params = (item.series, item.pbp, item.msp, item.lsp, item.binlen); let fut = InsertFut::new( data_store.scy.clone(), - data_store.qu_insert_bin_write_index_v03.clone(), + data_store.qu_insert_bin_write_index_v04.clone(), params, ); let fut = FutJob { fut, ts_net: tsnow }; diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index dea5178..57485d0 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -554,11 +554,10 @@ pub struct TimeBinSimpleF32V02 { } #[derive(Debug, Clone)] -pub struct BinWriteIndexV03 { +pub struct BinWriteIndexV04 { pub series: i64, pub pbp: i16, pub msp: i32, - pub rt: i16, pub lsp: i32, pub binlen: i32, } @@ -569,7 +568,7 @@ pub enum QueryItem { Insert(InsertItem), Msp(MspItem), TimeBinSimpleF32V02(TimeBinSimpleF32V02), - BinWriteIndexV03(BinWriteIndexV03), + BinWriteIndexV04(BinWriteIndexV04), Accounting(Accounting), AccountingRecv(AccountingRecv), } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 984873a..146f0f5 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -615,17 +615,16 @@ async fn migrate_scylla_data_schema( let tab = GenTwcsTab::new( ks, rett.table_prefix(), - "bin_write_index_v03", + "bin_write_index_v04", &[ ("series", "bigint"), ("pbp", "smallint"), ("msp", "int"), - ("rt", "smallint"), ("lsp", "int"), ("binlen", "int"), ], ["series", "pbp", "msp"], - ["rt", "lsp", "binlen"], + ["lsp", "binlen"], rett.ttl_binned(), ); tab.setup(chs, scy).await?; @@ -707,6 +706,12 @@ async fn migrate_scylla_data_schema( chs.add_todo(format!("drop table {}.{}", ks, tn)); } } + { + let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v03"); + if has_table(&ks, &tn, scy).await? { + chs.add_todo(format!("drop table {}.{}", ks, tn)); + } + } Ok(()) } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index fa04159..fdaef3d 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -45,7 +45,7 @@ pub struct DataStore { pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, - pub qu_insert_bin_write_index_v03: Arc, + pub qu_insert_bin_write_index_v04: Arc, pub qu_account_00: Arc, pub qu_account_recv_00: Arc, pub qu_dummy: Arc, @@ -157,10 +157,10 @@ impl DataStore { scy ); - let qu_insert_bin_write_index_v03 = prep_qu_ins_c!( - "bin_write_index_v03", - "series, pbp, msp, rt, lsp, binlen", - "?, ?, ?, ?, ?, ?", + let qu_insert_bin_write_index_v04 = prep_qu_ins_c!( + "bin_write_index_v04", + "series, pbp, msp, lsp, binlen", + "?, ?, ?, ?, ?", rett, scy ); @@ -219,7 +219,7 @@ impl DataStore { qu_insert_array_f64, qu_insert_array_bool, qu_insert_binned_scalar_f32_v02, - qu_insert_bin_write_index_v03, + qu_insert_bin_write_index_v04, qu_account_00, qu_account_recv_00, qu_dummy, diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 78b21d8..b6dd2dd 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -13,7 +13,7 @@ use netpod::Shape; use netpod::TsNano; use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::BinWriteIndexV03; +use scywr::iteminsertqueue::BinWriteIndexV04; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32V02; use serde::Serialize; @@ -104,6 +104,28 @@ impl IndexWritten { } } +#[derive(Debug, Serialize)] +struct BinnerStateA { + rt: RetentionTime, + binner: BinnedEventsTimeweight, + write_zero: WriteCntZero, + pbp: PrebinnedPartitioning, + index_written_1: IndexWritten, + index_written_2: Option, + discard_front: u8, +} + +#[derive(Debug, Serialize)] +struct BinnerStateB { + rt: RetentionTime, + binner: BinnedBinsTimeweight, + write_zero: WriteCntZero, + pbp: PrebinnedPartitioning, + index_written_1: IndexWritten, + index_written_2: Option, + discard_front: u8, +} + #[derive(Debug, Serialize)] pub struct BinWriter { chname: String, @@ -112,22 +134,8 @@ pub struct BinWriter { scalar_type: ScalarType, shape: Shape, evbuf: ContainerEvents, - binner_1st: Option<( - RetentionTime, - BinnedEventsTimeweight, - WriteCntZero, - PrebinnedPartitioning, - IndexWritten, - Option, - )>, - binner_others: Vec<( - RetentionTime, - BinnedBinsTimeweight, - WriteCntZero, - PrebinnedPartitioning, - IndexWritten, - Option, - )>, + binner_1st: Option, + binner_others: Vec, trd: bool, } @@ -242,7 +250,16 @@ impl BinWriter { } else { None }; - binner_1st = Some((rt, binner, write_zero, pbp, IndexWritten::new(), iw2)); + let st = BinnerStateA { + rt, + binner, + write_zero, + pbp, + index_written_1: IndexWritten::new(), + index_written_2: iw2, + discard_front: 0, + }; + binner_1st = Some(st); } else { let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len()); let mut binner = BinnedBinsTimeweight::new(range); @@ -254,7 +271,16 @@ impl BinWriter { } else { None }; - binner_others.push((rt, binner, write_zero, pbp, IndexWritten::new(), iw2)); + let st = BinnerStateB { + rt, + binner, + write_zero, + pbp, + index_written_1: IndexWritten::new(), + index_written_2: iw2, + discard_front: 0, + }; + binner_others.push(st); } } let ret = Self { @@ -322,17 +348,11 @@ impl BinWriter { fn tick_ingest_and_handle(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { let buf = &self.evbuf; - if let Some(ee) = self.binner_1st.as_mut() { - let rt = ee.0.clone(); - let write_zero = ee.2.clone(); - let binner = &mut ee.1; - let pbp = ee.3.clone(); - let index_written = &mut ee.4; - let iw2 = &mut ee.5; + if let Some(st) = self.binner_1st.as_mut() { // TODO avoid boxing let bufbox = Box::new(buf); use items_0::timebin::IngestReport; - let consumed_evs = match binner.ingest(&bufbox)? { + let consumed_evs = match st.binner.ingest(&bufbox)? { IngestReport::ConsumedAll => { let n = bufbox.len(); self.evbuf.clear(); @@ -343,28 +363,28 @@ impl BinWriter { n } }; - let bins = binner.output(); + let bins = st.binner.output(); if bins.len() > 0 { trace_bin!(self.trd, "binner_1st out len {}", bins.len()); Self::handle_output_ready( self.trd, true, self.sid, - rt, + st.rt.clone(), &bins, - write_zero, - index_written, - iw2, - pbp, + st.write_zero.clone(), + &mut st.index_written_1, + &mut st.index_written_2, + st.pbp.clone(), + &mut st.discard_front, iqdqs, )?; // TODO avoid boxing let mut bins2: BinsBoxed = Box::new(bins); for i in 0..self.binner_others.len() { - let (rt, binner, write_zero, pbp, index_written, iw2) = &mut self.binner_others[i]; - let write_zero = write_zero.clone(); - binner.ingest(&bins2)?; - let bb: Option = binner.output()?; + let st = &mut self.binner_others[i]; + st.binner.ingest(&bins2)?; + let bb: Option = st.binner.output()?; match bb { Some(bb) => { if bb.len() > 0 { @@ -374,12 +394,13 @@ impl BinWriter { self.trd, false, self.sid, - rt.clone(), + st.rt.clone(), &bb2, - write_zero, - index_written, - iw2, - pbp.clone(), + st.write_zero.clone(), + &mut st.index_written_1, + &mut st.index_written_2, + st.pbp.clone(), + &mut st.discard_front, iqdqs, )?; } else { @@ -419,6 +440,7 @@ impl BinWriter { iw1: &mut IndexWritten, iw2: &mut Option, pbp: PrebinnedPartitioning, + discard_front: &mut u8, iqdqs: &mut InsertDeques, ) -> Result<(), Error> { let selfname = "handle_output_ready"; @@ -446,80 +468,81 @@ impl BinWriter { let e = Error::UnexpectedBinLen(bin_len, pbp); return Err(e); } - { - let (msp, lsp) = pbp.msp_lsp(ts1.to_ts_ms()); - let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 { - series, - binlen: bin_len.ms() as i32, - msp: msp as i64, - off: lsp as i32, - cnt: cnt as i64, - min, - max, - avg, - dev: f32::NAN, - lst, - }); - if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { - debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); + if *discard_front < 1 { + *discard_front += 1; + } else { + { + let (msp, lsp) = pbp.msp_lsp(ts1.to_ts_ms()); + let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 { + series, + binlen: bin_len.ms() as i32, + msp: msp as i64, + off: lsp as i32, + cnt: cnt as i64, + min, + max, + avg, + dev: f32::NAN, + lst, + }); + if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { + debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); + } + let qu = iqdqs.deque(rt.clone()); + qu.push_back(item); } - let qu = iqdqs.deque(rt.clone()); - qu.push_back(item); - } - if pbp.uses_index_min10() { - let pbp_ix = PrebinnedPartitioning::Min10; - let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms()); - debug_bin!( - trd, - "handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}", - series, - pbp_ix, - pbp, - rt, - msp, - lsp - ); - let iw = iw2.as_mut().unwrap(); - if iw.should_write(msp, lsp) { - iw.mark_written(msp, lsp); - let item = BinWriteIndexV03 { - series: series.id() as i64, - pbp: pbp_ix.db_ix() as i16, - msp: msp as i32, - rt: rt.to_index_db_i32() as i16, - lsp: lsp as i32, - binlen: pbp.bin_len().ms() as i32, - }; - let item = QueryItem::BinWriteIndexV03(item); - iqdqs.deque(rt.clone()).push_back(item); + if pbp.uses_index_min10() { + let pbp_ix = PrebinnedPartitioning::Min10; + let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms()); + debug_bin!( + trd, + "handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}", + series, + pbp_ix, + pbp, + rt, + msp, + lsp + ); + let iw = iw2.as_mut().unwrap(); + if iw.should_write(msp, lsp) { + iw.mark_written(msp, lsp); + let item = BinWriteIndexV04 { + series: series.id() as i64, + pbp: pbp_ix.db_ix() as i16, + msp: msp as i32, + lsp: lsp as i32, + binlen: pbp.bin_len().ms() as i32, + }; + let item = QueryItem::BinWriteIndexV04(item); + iqdqs.deque(rt.clone()).push_back(item); + } } - } - if true { - let pbp_ix = PrebinnedPartitioning::Day1; - let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms()); - debug_bin!( - trd, - "handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}", - series, - pbp_ix, - pbp, - rt, - msp, - lsp - ); - // let iw = iw1; - if iw1.should_write(msp, lsp) { - iw1.mark_written(msp, lsp); - let item = BinWriteIndexV03 { - series: series.id() as i64, - pbp: pbp_ix.db_ix() as i16, - msp: msp as i32, - rt: rt.to_index_db_i32() as i16, - lsp: lsp as i32, - binlen: pbp.bin_len().ms() as i32, - }; - let item = QueryItem::BinWriteIndexV03(item); - iqdqs.deque(rt.clone()).push_back(item); + if true { + let pbp_ix = PrebinnedPartitioning::Day1; + let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms()); + debug_bin!( + trd, + "handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}", + series, + pbp_ix, + pbp, + rt, + msp, + lsp + ); + if iw1.should_write(msp, lsp) { + iw1.mark_written(msp, lsp); + let item = BinWriteIndexV04 { + series: series.id() as i64, + pbp: pbp_ix.db_ix() as i16, + msp: msp as i32, + lsp: lsp as i32, + binlen: pbp.bin_len().ms() as i32, + }; + let item = QueryItem::BinWriteIndexV04(item); + iqdqs.deque(rt.clone()).push_back(item); + } } } } diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index a4cdace..3922d85 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -137,6 +137,29 @@ mod Metrics { } } +// mod Metrics { +// type StructName = IocFinderMetrics; +// enum counters { +// dbsearcher_batch_recv, +// dbsearcher_item_recv, +// dbsearcher_select_res_0, +// dbsearcher_select_error_len_mismatch, +// dbsearcher_batch_send, +// dbsearcher_item_send, +// ca_udp_error, +// ca_udp_warn, +// ca_udp_unaccounted_data, +// ca_udp_batch_created, +// ca_udp_io_error, +// ca_udp_io_empty, +// ca_udp_io_recv, +// ca_udp_first_msg_not_version, +// ca_udp_recv_result, +// ca_udp_recv_timeout, +// ca_udp_logic_error, +// } +// } + mod Metrics { type StructName = DaemonMetrics; mod Compose { diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 2e42c41..b968ae1 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -2,15 +2,9 @@ pub mod mett; pub use rand_xoshiro; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; -const US: u64 = 1000; -const MS: u64 = US * 1000; -const SEC: u64 = MS * 1000; - pub type EMA = Ema32; #[derive(Clone, Debug)] @@ -283,126 +277,6 @@ impl XorShift32 { } } -stats_proc::stats_struct!(( - stats_struct( - name(DaemonStats), - prefix(daemon), - counters(asdasd,), - values( - channel_unknown_address, - channel_search_pending, - channel_with_address, - channel_no_address, - connset_health_lat_ema, - // iqtx_len_st_rf1, - iqtx_len_st_rf3, - iqtx_len_mt_rf3, - iqtx_len_lt_rf3, - iqtx_len_lt_rf3_lat5, - ), - ), - agg(name(DaemonStatsAgg), parent(DaemonStats)), - diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)), - stats_struct( - name(CaProtoStats), - prefix(ca_proto), - counters( - // tcp_recv_count, - // tcp_recv_bytes, - protocol_issue, - payload_std_too_large, - payload_ext_but_small, - payload_ext_very_large, - out_msg_placed, - out_bytes, - ), - histolog2s(payload_size, data_count, outbuf_len,), - ), - stats_struct( - name(SeriesByChannelStats), - prefix(seriesbychannel), - counters(res_tx_fail, res_tx_timeout, recv_batch, recv_items,), - histolog2s(commit_duration_ms), - ), - stats_struct( - name(InsertWorkerStats), - prefix(insert_worker), - counters( - logic_error, - item_recv, - inserted_values, - inserted_connection_status, - inserted_channel_status, - fraction_drop, - inserted_mute, - inserted_interval, - inserted_channel_info, - inserted_binned, - db_overload, - db_timeout, - db_unavailable, - db_error, - query_error, - inserts_msp, - inserts_msp_grid, - inserts_value, - ratelimit_drop, - worker_start, - worker_finish, - ), - histolog2s(item_lat_net_worker, item_lat_net_store,), - ), - stats_struct( - name(IocFinderStats), - prefix(ioc_finder), - counters( - dbsearcher_batch_recv, - dbsearcher_item_recv, - dbsearcher_select_res_0, - dbsearcher_select_error_len_mismatch, - dbsearcher_batch_send, - dbsearcher_item_send, - ca_udp_error, - ca_udp_warn, - ca_udp_unaccounted_data, - ca_udp_batch_created, - ca_udp_io_error, - ca_udp_io_empty, - ca_udp_io_recv, - ca_udp_first_msg_not_version, - ca_udp_recv_result, - ca_udp_recv_timeout, - ca_udp_logic_error, - ), - values(db_lookup_workers,) - ), - stats_struct( - name(SeriesWriterEstablishStats), - prefix(wrest), - counters(job_recv, result_send_fail,), - ), -)); - -stats_proc::stats_struct!(( - stats_struct(name(TestStats0), counters(count0,), values(val0),), - diff(name(TestStats0Diff), input(TestStats0)), - agg(name(TestStats0Agg), parent(TestStats0)), - diff(name(TestStats0AggDiff), input(TestStats0Agg)), -)); - -#[test] -fn test0_diff() { - let stats_a = TestStats0::new(); - stats_a.count0().inc(); - stats_a.val0().set(43); - let stats_b = stats_a.snapshot(); - stats_b.count0().inc(); - stats_b.count0().inc(); - stats_b.count0().inc(); - let diff = TestStats0Diff::diff_from(&stats_a, &stats_b); - assert_eq!(diff.count0.load(), 3); -} - pub fn xoshiro_from_time() -> rand_xoshiro::Xoshiro128PlusPlus { use rand_xoshiro::rand_core::SeedableRng; use std::time::SystemTime;