From b21dfae5609601b037ccd7fdf275ecb84a3ccc75 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 14 Sep 2023 16:55:53 +0200 Subject: [PATCH] Try settings on sf-daqsync-02 --- daqingest/src/daemon.rs | 38 +++++++--- dbpg/src/conn.rs | 10 ++- dbpg/src/pool.rs | 2 +- dbpg/src/seriesbychannel.rs | 5 +- ingest-bsread/src/bsreadclient.rs | 1 + netfetch/src/ca/conn.rs | 68 +++++++++++------ netfetch/src/ca/connset.rs | 122 +++++++++++++++++------------- netfetch/src/ca/finder.rs | 2 +- netfetch/src/ca/search.rs | 4 +- netfetch/src/conf.rs | 4 +- netfetch/src/metrics.rs | 33 ++++++-- netfetch/src/throttletrace.rs | 11 +-- netfetch/src/timebin.rs | 6 +- scywr/src/insertworker.rs | 62 +++++++++++---- scywr/src/iteminsertqueue.rs | 53 ++++++++----- stats/src/stats.rs | 34 ++++++++- 16 files changed, 296 insertions(+), 159 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 743c060..23344bd 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -58,7 +58,7 @@ enum CheckPeriodic { pub struct DaemonOpts { backend: String, local_epics_hostname: String, - array_truncate: usize, + array_truncate: u64, insert_item_queue_cap: usize, pgconf: Database, scyconf: ScyllaConfig, @@ -67,6 +67,7 @@ pub struct DaemonOpts { test_bsread_addr: Option, insert_worker_count: usize, insert_scylla_sessions: usize, + insert_frac: Arc, } impl DaemonOpts { @@ -90,6 +91,7 @@ pub struct Daemon { insert_workers_jh: Vec>>, stats: Arc, insert_worker_stats: Arc, + series_by_channel_stats: Arc, shutting_down: bool, connset_ctrl: CaConnSetCtrl, connset_status_last: CheckPeriodic, @@ -107,7 +109,7 @@ impl Daemon { // TODO keep join handles and await later let (channel_info_query_tx, jhs, jh) = - dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf, series_by_channel_stats) + dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf, series_by_channel_stats.clone()) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; @@ -152,13 +154,12 @@ impl Daemon { let use_rate_limit_queue = false; - // TODO use a new stats type: - let store_stats = Arc::new(stats::CaConnStats::new()); let ttls = opts.ttls.clone(); let insert_worker_opts = InsertWorkerOpts { store_workers_rate: Arc::new(AtomicU64::new(20000000)), insert_workers_running: Arc::new(AtomicU64::new(0)), - insert_frac: Arc::new(AtomicU64::new(1000)), + insert_frac: opts.insert_frac.clone(), + array_truncate: Arc::new(AtomicU64::new(opts.array_truncate)), }; let insert_worker_opts = Arc::new(insert_worker_opts); let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers( @@ -172,6 +173,8 @@ impl Daemon { ttls, ) .await?; + let stats = Arc::new(DaemonStats::new()); + stats.insert_worker_spawned().add(insert_workers_jh.len() as _); #[cfg(feature = "bsread")] if let Some(bsaddr) = &opts.test_bsread_addr { @@ -219,8 +222,9 @@ impl Daemon { count_assigned: 0, last_status_print: SystemTime::now(), insert_workers_jh, - stats: Arc::new(DaemonStats::new()), + stats, insert_worker_stats, + series_by_channel_stats, shutting_down: false, connset_ctrl: conn_set_ctrl, connset_status_last: CheckPeriodic::Waiting(Instant::now()), @@ -516,18 +520,20 @@ impl Daemon { } } } - warn!("TODO wait for insert workers"); while let Some(jh) = self.insert_workers_jh.pop() { match jh.await.map_err(Error::from_string) { Ok(x) => match x { Ok(()) => { - debug!("joined insert worker"); + self.stats.insert_worker_join_ok().inc(); + // debug!("joined insert worker"); } Err(e) => { + self.stats.insert_worker_join_ok_err().inc(); error!("joined insert worker, error {e}"); } }, Err(e) => { + self.stats.insert_worker_join_err().inc(); error!("insert worker join error {e}"); } } @@ -556,11 +562,11 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?; ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?; - let pg = dbpg::conn::make_pg_client(opts.postgresql_config()) + let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) .await .map_err(Error::from_string)?; - dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?; + drop(pg); scywr::schema::migrate_scylla_data_schema(opts.scylla_config()) .await @@ -578,6 +584,8 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> channels.clear(); } + let insert_frac = Arc::new(AtomicU64::new(opts.insert_frac())); + let opts2 = DaemonOpts { backend: opts.backend().into(), local_epics_hostname: opts.local_epics_hostname().into(), @@ -594,20 +602,26 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> test_bsread_addr: opts.test_bsread_addr.clone(), insert_worker_count: opts.insert_worker_count(), insert_scylla_sessions: opts.insert_scylla_sessions(), + insert_frac: insert_frac.clone(), }; let daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); let daemon_stats = daemon.stats().clone(); let connset_cmd_tx = daemon.connset_ctrl.sender().clone(); + let ca_conn_stats = daemon.connset_ctrl.ca_conn_stats().clone(); let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8); let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); let metrics_jh = { + let conn_set_stats = daemon.connset_ctrl.stats().clone(); let stats_set = StatsSet::new( daemon_stats, - daemon.connset_ctrl.stats().clone(), + conn_set_stats, + ca_conn_stats, daemon.insert_worker_stats.clone(), + daemon.series_by_channel_stats.clone(), + insert_frac, ); let fut = netfetch::metrics::metrics_service(opts.api_bind(), dcom, connset_cmd_tx, stats_set, metrics_shutdown_rx); @@ -625,7 +639,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> Ok(()) => {} Err(_) => break, } - thr_msg.trigger_fmt("sent ChannelAdd", &[&i as &_]); + thr_msg.trigger("sent ChannelAdd", &[&i as &_]); i += 1; } debug!("{} configured channels applied", channels.len()); diff --git a/dbpg/src/conn.rs b/dbpg/src/conn.rs index 0aca872..e5afbc2 100644 --- a/dbpg/src/conn.rs +++ b/dbpg/src/conn.rs @@ -2,16 +2,20 @@ use crate::err::Error; use log::*; use netpod::Database; use taskrun::tokio; +use tokio::task::JoinHandle; use tokio_postgres::Client; pub type PgClient = Client; -pub async fn make_pg_client(dbconf: &Database) -> Result { +pub async fn make_pg_client(dbconf: &Database) -> Result<(PgClient, JoinHandle>), Error> { let d = dbconf; let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); info!("connect to {url}"); let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls).await?; // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: - tokio::spawn(pg_conn); - Ok(client) + let jh = tokio::spawn(async move { + pg_conn.await?; + Ok(()) + }); + Ok((client, jh)) } diff --git a/dbpg/src/pool.rs b/dbpg/src/pool.rs index 8641a98..2d84f16 100644 --- a/dbpg/src/pool.rs +++ b/dbpg/src/pool.rs @@ -79,7 +79,7 @@ impl PgPool { pub async fn new(cap: usize, dbconf: &Database) -> Result { let (tx, rx) = async_channel::bounded(2 + cap); for _ in 0..cap { - let pgc = crate::conn::make_pg_client(dbconf).await?; + let (pgc, jh) = crate::conn::make_pg_client(dbconf).await?; let pgc = PgClientInner { pgc, handout_count: 0 }; tx.send(pgc).await?; } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 9a6c1d7..7ca5177 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -108,7 +108,7 @@ impl Worker { batch_rx: Receiver>, stats: Arc, ) -> Result { - let pg = crate::conn::make_pg_client(db).await?; + let (pg, jh) = crate::conn::make_pg_client(db).await?; let sql = concat!( "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", " as inp (backend, channel, scalar_type, shape_dims, rid))", @@ -290,7 +290,8 @@ impl Worker { async fn work(&mut self) -> Result<(), Error> { while let Some(batch) = self.batch_rx.next().await { - trace!("worker recv batch len {}", batch.len()); + self.stats.recv_batch().inc(); + self.stats.recv_items().add(batch.len() as _); for x in &batch { trace3!( "search for {} {} {:?} {:?}", diff --git a/ingest-bsread/src/bsreadclient.rs b/ingest-bsread/src/bsreadclient.rs index d4c855a..33638e5 100644 --- a/ingest-bsread/src/bsreadclient.rs +++ b/ingest-bsread/src/bsreadclient.rs @@ -189,6 +189,7 @@ impl BsreadClient { scalar_type, shape, val: DataValue::Array(ArrayValue::Bool(evtset)), + ts_local: ts, }; let item = QueryItem::Insert(item); match self.insqtx.send(item).await { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index fba3be5..e7df175 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -509,6 +509,7 @@ impl CaConn { remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, channel_info_query_tx: Sender, + stats: Arc, ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { @@ -527,7 +528,7 @@ impl CaConn { insert_item_queue: VecDeque::new(), remote_addr_dbg, local_epics_hostname, - stats: Arc::new(CaConnStats::new()), + stats, insert_ivl_min_mus: 1000 * 6, conn_command_tx: cq_tx, conn_command_rx: cq_rx, @@ -893,24 +894,27 @@ impl CaConn { fn check_channels_alive(&mut self) -> Result<(), Error> { let tsnow = Instant::now(); trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); - if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) { - if let Some(started) = self.ioc_ping_start { - if started.elapsed() > Duration::from_millis(4000) { - warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg); - let item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::EchoTimeout, - }; - self.ca_conn_event_out_queue.push_back(item); - self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout); - } - } else { - self.ioc_ping_start = Some(Instant::now()); + if let Some(started) = self.ioc_ping_start { + if started.elapsed() >= Duration::from_millis(4000) { + self.stats.pong_timeout().inc(); + self.ioc_ping_start = None; + warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg); + let item = CaConnEvent { + ts: tsnow, + value: CaConnEventValue::EchoTimeout, + }; + self.ca_conn_event_out_queue.push_back(item); + self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout); + } + } else { + if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) { if let Some(proto) = &mut self.proto { - trace2!("ping to {}", self.remote_addr_dbg); + self.stats.ping_start().inc(); + self.ioc_ping_start = Some(Instant::now()); let msg = CaMsg { ty: CaMsgTy::Echo }; proto.push_out(msg); } else { + self.stats.ping_no_proto().inc(); warn!("can not ping {} no proto", self.remote_addr_dbg); self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol); } @@ -1057,6 +1061,7 @@ impl CaConn { scalar_type: ScalarType, shape: Shape, ts: u64, + ts_local: u64, ev: proto::EventAddRes, item_queue: &mut VecDeque, ts_msp_last: u64, @@ -1090,6 +1095,7 @@ impl CaConn { shape, val: ev.value.data.into(), ts_msp_grid, + ts_local, }; item_queue.push_back(QueryItem::Insert(item)); stats.insert_item_create.inc(); @@ -1102,6 +1108,7 @@ impl CaConn { scalar_type: ScalarType, shape: Shape, ts: u64, + ts_local: u64, ev: proto::EventAddRes, tsnow: Instant, item_queue: &mut VecDeque, @@ -1136,6 +1143,7 @@ impl CaConn { scalar_type.clone(), shape.clone(), ts - 1 - i as u64, + ts_local - 1 - i as u64, ev.clone(), item_queue, ts_msp_last, @@ -1150,6 +1158,7 @@ impl CaConn { scalar_type, shape, ts, + ts_local, ev, item_queue, ts_msp_last, @@ -1265,6 +1274,7 @@ impl CaConn { scalar_type, shape, ts, + ts_local, ev, tsnow, item_queue, @@ -1504,16 +1514,28 @@ impl CaConn { } CaMsgTy::AccessRightsRes(_) => {} CaMsgTy::Echo => { - let addr = &self.remote_addr_dbg; + // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { let dt = started.elapsed().as_secs_f32() * 1e3; - if dt > 50. { - info!("Received Echo {dt:10.0}ms {addr:?}"); - } else if dt > 500. { - warn!("Received Echo {dt:10.0}ms {addr:?}"); + if dt <= 10. { + self.stats.pong_recv_010ms().inc(); + } else if dt <= 25. { + self.stats.pong_recv_025ms().inc(); + } else if dt <= 50. { + self.stats.pong_recv_050ms().inc(); + } else if dt <= 100. { + self.stats.pong_recv_100ms().inc(); + } else if dt <= 200. { + self.stats.pong_recv_200ms().inc(); + } else if dt <= 400. { + self.stats.pong_recv_400ms().inc(); + } else { + self.stats.pong_recv_slow().inc(); + // warn!("Received Echo {dt:10.0}ms {addr:?}"); } } else { - info!("Received Echo even though we didn't asked for it {addr:?}"); + let addr = &self.remote_addr_dbg; + warn!("Received Echo even though we didn't asked for it {addr:?}"); } self.ioc_ping_last = Instant::now(); self.ioc_ping_start = None; @@ -1786,8 +1808,8 @@ impl Stream for CaConn { let poll_ts1 = Instant::now(); let ret = loop { let qlen = self.insert_item_queue.len(); - if qlen >= 200 { - self.thr_msg_poll.trigger_fmt("CaConn::poll_next", &[&qlen]); + if qlen > self.opts.insert_queue_max / 3 { + self.stats.insert_item_queue_pressure().inc(); } break if let CaConnState::EndOfStream = self.state { Ready(None) diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 6a9251a..c09288d 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -196,6 +196,7 @@ pub struct CaConnSetCtrl { tx: Sender, rx: Receiver, stats: Arc, + ca_conn_stats: Arc, jh: JoinHandle>, } @@ -246,6 +247,10 @@ impl CaConnSetCtrl { pub fn stats(&self) -> &Arc { &self.stats } + + pub fn ca_conn_stats(&self) -> &Arc { + &self.ca_conn_stats + } } #[derive(Debug)] @@ -295,6 +300,7 @@ pub struct CaConnSet { shutdown_done: bool, chan_check_next: Option, stats: Arc, + ca_conn_stats: Arc, ioc_finder_jh: JoinHandle>, await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, thr_msg_poll_1: ThrottleTrace, @@ -318,6 +324,10 @@ impl CaConnSet { super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf); let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400); let stats = Arc::new(CaConnSetStats::new()); + let ca_conn_stats = Arc::new(CaConnStats::new()); + stats.test_1().inc(); + stats.test_1().inc(); + stats.test_1().inc(); let connset = Self { backend, local_epics_hostname, @@ -342,6 +352,7 @@ impl CaConnSet { shutdown_done: false, chan_check_next: None, stats: stats.clone(), + ca_conn_stats: ca_conn_stats.clone(), connset_out_tx, connset_out_queue: VecDeque::new(), // connset_out_sender: SenderPolling::new(connset_out_tx), @@ -357,6 +368,7 @@ impl CaConnSet { tx: connset_inp_tx, rx: connset_out_rx, stats, + ca_conn_stats, jh, } } @@ -608,7 +620,7 @@ impl CaConnSet { return Ok(()); } self.thr_msg_storage_len - .trigger_fmt("msg", &[&self.storage_insert_sender.len()]); + .trigger("msg", &[&self.storage_insert_sender.len()]); debug!("TODO handle_check_health"); // Trigger already the next health check, but use the current data that we have. @@ -726,6 +738,7 @@ impl CaConnSet { addr_v4, add.local_epics_hostname, self.channel_info_query_tx.clone(), + self.ca_conn_stats.clone(), ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); @@ -1136,16 +1149,14 @@ impl Stream for CaConnSet { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.stats.poll_fn_begin().inc(); - debug!("CaConnSet poll"); loop { self.stats.poll_loop_begin().inc(); - let n1 = self.channel_info_query_queue.len(); - let p2 = self.channel_info_query_sender.len(); - let p3 = self.channel_info_res_rx.len(); - self.thr_msg_poll_1 - .trigger_fmt("CaConnSet channel_info_query_queue", &[&n1, &p2, &p3]); + self.thr_msg_poll_1.trigger("CaConnSet::poll", &[]); self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _); + self.stats + .channel_info_query_queue_len + .set(self.channel_info_query_queue.len() as _); self.stats .channel_info_query_sender_len .set(self.channel_info_query_sender.len().unwrap_or(0) as _); @@ -1158,6 +1169,7 @@ impl Stream for CaConnSet { self.stats.ca_conn_res_tx_len.set(self.ca_conn_res_tx.len() as _); let mut have_pending = false; + let mut have_progress = false; self.try_push_ca_conn_cmds(); @@ -1190,6 +1202,7 @@ impl Stream for CaConnSet { error!("CaConn {addr} join error: {e}"); } } + have_progress = true; } Pending => { have_pending = true; @@ -1206,7 +1219,9 @@ impl Stream for CaConnSet { } if self.storage_insert_sender.is_sending() { match self.storage_insert_sender.poll_unpin(cx) { - Ready(Ok(())) => {} + Ready(Ok(())) => { + have_progress = true; + } Ready(Err(_)) => { let e = Error::with_msg_no_trace("can not send into channel"); error!("{e}"); @@ -1225,7 +1240,9 @@ impl Stream for CaConnSet { } if self.find_ioc_query_sender.is_sending() { match self.find_ioc_query_sender.poll_unpin(cx) { - Ready(Ok(())) => {} + Ready(Ok(())) => { + have_progress = true; + } Ready(Err(_)) => { let e = Error::with_msg_no_trace("can not send into channel"); error!("{e}"); @@ -1244,7 +1261,9 @@ impl Stream for CaConnSet { } if self.channel_info_query_sender.is_sending() { match self.channel_info_query_sender.poll_unpin(cx) { - Ready(Ok(())) => {} + Ready(Ok(())) => { + have_progress = true; + } Ready(Err(_)) => { let e = Error::with_msg_no_trace("can not send into channel"); error!("{e}"); @@ -1256,84 +1275,79 @@ impl Stream for CaConnSet { } } - let item = match self.find_ioc_res_rx.poll_next_unpin(cx) { + match self.find_ioc_res_rx.poll_next_unpin(cx) { Ready(Some(x)) => match self.handle_ioc_query_result(x) { - Ok(()) => Ready(None), - Err(e) => Ready(Some(CaConnSetItem::Error(e))), + Ok(()) => { + have_progress = true; + } + Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, - Ready(None) => Ready(None), + Ready(None) => {} Pending => { have_pending = true; - Pending } - }; - match item { - Ready(Some(x)) => break Ready(Some(x)), - _ => {} } - let item = match self.ca_conn_res_rx.poll_next_unpin(cx) { + match self.ca_conn_res_rx.poll_next_unpin(cx) { Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) { - Ok(()) => Ready(None), - Err(e) => Ready(Some(CaConnSetItem::Error(e))), + Ok(()) => { + have_progress = true; + } + Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, - Ready(None) => Ready(None), + Ready(None) => {} Pending => { have_pending = true; - Pending } - }; - match item { - Ready(Some(x)) => break Ready(Some(x)), - _ => {} } - let item = match self.channel_info_res_rx.poll_next_unpin(cx) { + match self.channel_info_res_rx.poll_next_unpin(cx) { Ready(Some(x)) => match self.handle_series_lookup_result(x) { - Ok(()) => Ready(None), - Err(e) => Ready(Some(CaConnSetItem::Error(e))), + Ok(()) => { + have_progress = true; + } + Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, - Ready(None) => Ready(None), + Ready(None) => {} Pending => { have_pending = true; - Pending } - }; - match item { - Ready(Some(x)) => break Ready(Some(x)), - _ => {} } - let item = match self.connset_inp_rx.poll_next_unpin(cx) { + match self.connset_inp_rx.poll_next_unpin(cx) { Ready(Some(x)) => match self.handle_event(x) { - Ok(()) => Ready(None), - Err(e) => Ready(Some(CaConnSetItem::Error(e))), + Ok(()) => { + have_progress = true; + } + Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, - Ready(None) => Ready(None), + Ready(None) => {} Pending => { have_pending = true; - Pending } - }; - match item { - Ready(Some(x)) => break Ready(Some(x)), - _ => {} } break if self.ready_for_end_of_stream() { - if have_pending { - self.stats.ready_for_end_of_stream_with_pending().inc(); + self.stats.ready_for_end_of_stream().inc(); + if have_progress { + self.stats.ready_for_end_of_stream_with_progress().inc(); + continue; } else { - self.stats.ready_for_end_of_stream_no_pending().inc(); + Ready(None) } - Ready(None) } else { - if have_pending { - self.stats.poll_pending().inc(); - Pending - } else { + if have_progress { self.stats.poll_reloop().inc(); continue; + } else { + if have_pending { + self.stats.poll_pending().inc(); + Pending + } else { + self.stats.poll_no_progress_no_pending().inc(); + let e = Error::with_msg_no_trace("no progress no pending"); + Ready(Some(CaConnSetItem::Error(e))) + } } }; } diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index db21a4a..b56ce84 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -89,7 +89,7 @@ async fn finder_worker_single( backend: String, db: Database, ) -> Result<(), Error> { - let pg = make_pg_client(&db) + let (pg, jh) = make_pg_client(&db) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let sql = concat!( diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 7d28a7b..6aa7ef4 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -70,7 +70,7 @@ impl DbUpdateWorker { pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { info!("ca_search begin"); - let pg = dbpg::conn::make_pg_client(opts.postgresql_config()) + let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; dbpg::schema::schema_check(&pg) @@ -123,7 +123,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), let mut dbworkers = Vec::new(); for _ in 0..DB_WORKER_COUNT { - let pg = dbpg::conn::make_pg_client(opts.postgresql_config()) + let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), pg).await?; diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index df8da87..9171bbc 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -26,7 +26,7 @@ pub struct CaIngestOpts { timeout: Option, postgresql: Database, scylla: ScyllaConfig, - array_truncate: Option, + array_truncate: Option, insert_worker_count: Option, insert_scylla_sessions: Option, insert_queue_max: Option, @@ -87,7 +87,7 @@ impl CaIngestOpts { self.insert_queue_max.unwrap_or(64) } - pub fn array_truncate(&self) -> usize { + pub fn array_truncate(&self) -> u64 { self.array_truncate.unwrap_or(512) } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index c479bdc..9ae82d5 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -21,8 +21,10 @@ use stats::CaConnStatsAgg; use stats::CaConnStatsAggDiff; use stats::DaemonStats; use stats::InsertWorkerStats; +use stats::SeriesByChannelStats; use std::collections::HashMap; use std::net::SocketAddrV4; +use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -31,19 +33,28 @@ use taskrun::tokio; pub struct StatsSet { daemon: Arc, ca_conn_set: Arc, + ca_conn: Arc, insert_worker_stats: Arc, + series_by_channel_stats: Arc, + insert_frac: Arc, } impl StatsSet { pub fn new( daemon: Arc, ca_conn_set: Arc, + ca_conn: Arc, insert_worker_stats: Arc, + series_by_channel_stats: Arc, + insert_frac: Arc, ) -> Self { Self { daemon, ca_conn_set, + ca_conn, insert_worker_stats, + series_by_channel_stats, + insert_frac, } } } @@ -198,8 +209,12 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st let mut s1 = stats_set.daemon.prometheus(); let s2 = stats_set.ca_conn_set.prometheus(); let s3 = stats_set.insert_worker_stats.prometheus(); + let s4 = stats_set.ca_conn.prometheus(); + let s5 = stats_set.series_by_channel_stats.prometheus(); s1.push_str(&s2); s1.push_str(&s3); + s1.push_str(&s4); + s1.push_str(&s5); s1 } }), @@ -241,7 +256,7 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st }), ) .route( - "/store_workers_rate", + "/daqingest/store_workers_rate", get({ let dcom = dcom.clone(); || async move { axum::Json(123) } @@ -252,18 +267,20 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st }), ) .route( - "/insert_frac", + "/daqingest/insert_frac", get({ - let dcom = dcom.clone(); - || async move { axum::Json(123) } + let insert_frac = stats_set.insert_frac.clone(); + || async move { axum::Json(insert_frac.load(Ordering::Acquire)) } }) .put({ - let dcom = dcom.clone(); - |v: extract::Json| async move {} + let insert_frac = stats_set.insert_frac.clone(); + |v: extract::Json| async move { + insert_frac.store(v.0, Ordering::Release); + } }), ) .route( - "/extra_inserts_conf", + "/daqingest/extra_inserts_conf", get({ let dcom = dcom.clone(); || async move { axum::Json(serde_json::to_value(&"TODO").unwrap()) } @@ -274,7 +291,7 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st }), ) .route( - "/insert_ivl_min", + "/daqingest/insert_ivl_min", put({ let dcom = dcom.clone(); |v: extract::Json| async move {} diff --git a/netfetch/src/throttletrace.rs b/netfetch/src/throttletrace.rs index c5ba9b9..2c6a0c4 100644 --- a/netfetch/src/throttletrace.rs +++ b/netfetch/src/throttletrace.rs @@ -18,16 +18,7 @@ impl ThrottleTrace { } } - pub fn trigger(&mut self, msg: &str) { - self.count += 1; - let tsnow = Instant::now(); - if self.next <= tsnow { - self.next = tsnow + self.ivl; - debug!("{} (count {})", msg, self.count); - } - } - - pub fn trigger_fmt(&mut self, msg: &str, params: &[&dyn fmt::Debug]) { + pub fn trigger(&mut self, msg: &str, params: &[&dyn fmt::Debug]) { self.count += 1; let tsnow = Instant::now(); if self.next <= tsnow { diff --git a/netfetch/src/timebin.rs b/netfetch/src/timebin.rs index fb22ede..3f03782 100644 --- a/netfetch/src/timebin.rs +++ b/netfetch/src/timebin.rs @@ -143,7 +143,7 @@ impl ConnTimeBin { trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); } _ => { - warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); } } } @@ -151,12 +151,12 @@ impl ConnTimeBin { //type Cont = EventsDim1; match scalar_type { _ => { - warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); } } } _ => { - warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + trace2!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); } } Ok(()) diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index a5eb9c1..98eba81 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -10,7 +10,6 @@ use log::*; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::ScyllaConfig; -use stats::CaConnStats; use stats::InsertWorkerStats; use std::sync::atomic; use std::sync::atomic::AtomicU64; @@ -18,9 +17,19 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use std::time::SystemTime; use taskrun::tokio; use taskrun::tokio::task::JoinHandle; +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::Error) { use crate::iteminsertqueue::Error; match err { @@ -70,6 +79,7 @@ pub struct InsertWorkerOpts { pub store_workers_rate: Arc, pub insert_workers_running: Arc, pub insert_frac: Arc, + pub array_truncate: Arc, } async fn rate_limiter_worker( @@ -139,6 +149,7 @@ async fn worker( data_store: Arc, stats: Arc, ) -> Result<(), Error> { + stats.worker_start().inc(); insert_worker_opts .insert_workers_running .fetch_add(1, atomic::Ordering::AcqRel); @@ -178,23 +189,43 @@ async fn worker( } } QueryItem::Insert(item) => { - if true { + let tsnow = { + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 + }; + let dt = (tsnow / 1000) as i64 - (item.ts_local / 1000) as i64; + if dt < 0 { + stats.item_latency_neg().inc(); + } else if dt <= 1000 * 25 { + stats.item_latency_025ms().inc(); + } else if dt <= 1000 * 50 { + stats.item_latency_050ms().inc(); + } else if dt <= 1000 * 100 { + stats.item_latency_100ms().inc(); + } else if dt <= 1000 * 200 { + stats.item_latency_200ms().inc(); + } else if dt <= 1000 * 400 { + stats.item_latency_400ms().inc(); + } else if dt <= 1000 * 800 { + stats.item_latency_800ms().inc(); + } else { + stats.item_latency_large().inc(); + } + if false { stats.inserted_values().inc(); } else { let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); - if i1 % 1000 < insert_frac { - match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await { - Ok(_) => { - stats.inserted_values().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } + let do_insert = i1 % 1000 < insert_frac; + match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats, do_insert).await { + Ok(_) => { + stats.inserted_values().inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } - } else { - stats.fraction_drop().inc(); } i1 += 1; } @@ -297,10 +328,11 @@ async fn worker( } } } + stats.worker_finish().inc(); insert_worker_opts .insert_workers_running .fetch_sub(1, atomic::Ordering::AcqRel); - trace!("insert worker {worker_ix} done"); + trace2!("insert worker {worker_ix} done"); Ok(()) } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index a49b3a2..2232fba 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -192,6 +192,7 @@ pub struct InsertItem { pub scalar_type: ScalarType, pub shape: Shape, pub val: DataValue, + pub ts_local: u64, } #[derive(Debug)] @@ -326,6 +327,7 @@ struct InsParCom { ts_lsp: u64, pulse: u64, ttl: u32, + do_insert: bool, } async fn insert_scalar_gen( @@ -345,18 +347,22 @@ where val, par.ttl as i32, ); - let y = data_store.scy.execute(qu, params).await; - match y { - Ok(_) => Ok(()), - Err(e) => match e { - QueryError::TimeoutError => Err(Error::DbTimeout), - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - DbError::Overloaded => Err(Error::DbOverload), + if par.do_insert { + let y = data_store.scy.execute(qu, params).await; + match y { + Ok(_) => Ok(()), + Err(e) => match e { + QueryError::TimeoutError => Err(Error::DbTimeout), + // TODO use `msg` + QueryError::DbError(e, _msg) => match e { + DbError::Overloaded => Err(Error::DbOverload), + _ => Err(e.into()), + }, _ => Err(e.into()), }, - _ => Err(e.into()), - }, + } + } else { + Ok(()) } } @@ -369,16 +375,20 @@ async fn insert_array_gen( where ST: scylla::frame::value::Value, { - let params = ( - par.series as i64, - par.ts_msp as i64, - par.ts_lsp as i64, - par.pulse as i64, - val, - par.ttl as i32, - ); - data_store.scy.execute(qu, params).await?; - Ok(()) + if par.do_insert { + let params = ( + par.series as i64, + par.ts_msp as i64, + par.ts_lsp as i64, + par.pulse as i64, + val, + par.ttl as i32, + ); + data_store.scy.execute(qu, params).await?; + Ok(()) + } else { + Ok(()) + } } static warn_last: AtomicU64 = AtomicU64::new(0); @@ -390,6 +400,7 @@ pub async fn insert_item( ttl_1d: Duration, data_store: &DataStore, stats: &InsertWorkerStats, + do_insert: bool, ) -> Result<(), Error> { if item.msp_bump { let params = (item.series.id() as i64, item.ts_msp as i64, ttl_index.as_secs() as i32); @@ -420,6 +431,7 @@ pub async fn insert_item( ts_lsp: item.ts_lsp, pulse: item.pulse, ttl: ttl_0d.as_secs() as _, + do_insert, }; use ScalarValue::*; match val { @@ -456,6 +468,7 @@ pub async fn insert_item( ts_lsp: item.ts_lsp, pulse: item.pulse, ttl: ttl_1d.as_secs() as _, + do_insert, }; use ArrayValue::*; match val { diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 5282440..7b9dd6b 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -229,15 +229,18 @@ stats_proc::stats_struct!(( channel_wait_for_status_id, channel_wait_for_address, logic_error, - ready_for_end_of_stream_with_pending, - ready_for_end_of_stream_no_pending, + ready_for_end_of_stream, + ready_for_end_of_stream_with_progress, poll_fn_begin, poll_loop_begin, poll_pending, poll_reloop, + poll_no_progress_no_pending, + test_1, ), values( storage_insert_tx_len, + channel_info_query_queue_len, channel_info_query_sender_len, channel_info_res_tx_len, find_ioc_query_sender_len, @@ -249,7 +252,7 @@ stats_proc::stats_struct!(( stats_struct( name(SeriesByChannelStats), prefix(seriesbychannel), - counters(res_tx_fail, res_tx_timeout,), + counters(res_tx_fail, res_tx_timeout, recv_batch, recv_items,), ), stats_struct( name(InsertWorkerStats), @@ -273,6 +276,16 @@ stats_proc::stats_struct!(( inserts_msp_grid, inserts_value, ratelimit_drop, + item_latency_neg, + item_latency_025ms, + item_latency_050ms, + item_latency_100ms, + item_latency_200ms, + item_latency_400ms, + item_latency_800ms, + item_latency_large, + worker_start, + worker_finish, ) ), )); @@ -289,6 +302,7 @@ stats_proc::stats_struct!(( inserts_queue_pop_for_global, inserts_queue_push, inserts_queue_drop, + insert_item_queue_pressure, channel_fast_item_drop, store_worker_recv_queue_len, // TODO maybe rename: this is now only the recv of the intermediate queue: @@ -331,6 +345,16 @@ stats_proc::stats_struct!(( ca_ts_off_2, ca_ts_off_3, ca_ts_off_4, + ping_start, + ping_no_proto, + pong_recv_010ms, + pong_recv_025ms, + pong_recv_050ms, + pong_recv_100ms, + pong_recv_200ms, + pong_recv_400ms, + pong_recv_slow, + pong_timeout, ), values(inter_ivl_ema) ), @@ -361,6 +385,10 @@ stats_proc::stats_struct!(( ca_conn_status_feedback_no_dst, ca_echo_timeout_total, caconn_done_channel_state_reset, + insert_worker_spawned, + insert_worker_join_ok, + insert_worker_join_ok_err, + insert_worker_join_err, ), values( channel_unknown_address,