From 188660ae2318985c4aee13b366e7487d25926665 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 26 Sep 2024 12:13:14 +0200 Subject: [PATCH] Reduce worker count to reduce postgres connections --- daqingest/src/daemon.rs | 2 +- dbpg/src/seriesbychannel.rs | 2 ++ netfetch/src/ca/finder.rs | 3 ++- netfetch/src/metrics/ingest.rs | 2 +- serieswriter/src/timebin.rs | 2 +- 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index e16f7c4..461a929 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -93,7 +93,7 @@ impl Daemon { // TODO keep join handles and await later let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers::< dbpg::seriesbychannel::SalterRandom, - >(4, &opts.pgconf, series_by_channel_stats.clone()) + >(2, &opts.pgconf, series_by_channel_stats.clone()) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index f8cb9d6..38ca596 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -154,6 +154,7 @@ impl Worker { stats: Arc, ) -> Result { use tokio_postgres::types::Type; + debug!("Worker make_pg_client"); let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; let sql = concat!( "with q1 as (", @@ -643,6 +644,7 @@ impl HashSalter for SalterRandom { } } +#[cfg(test)] async fn psql_play(db: &Database) -> Result<(), Error> { use tokio_postgres::types::ToSql; use tokio_postgres::types::Type; diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 125482e..1c630d0 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -22,7 +22,7 @@ use std::time::Instant; use taskrun::tokio; use tokio::task::JoinHandle; -const SEARCH_DB_PIPELINE_LEN: usize = 4; +const SEARCH_DB_PIPELINE_LEN: usize = 2; #[allow(unused)] macro_rules! debug_batch { @@ -144,6 +144,7 @@ async fn finder_worker_single( db: Database, stats: Arc, ) -> Result<(), Error> { + debug!("finder_worker_single make_pg_client"); let (pg, jh) = make_pg_client(&db) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index 63435b7..853c93c 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -11,8 +11,8 @@ use err::ThisError; use futures_util::StreamExt; use futures_util::TryStreamExt; use items_2::eventsdim0::EventsDim0; -use items_2::eventsdim0::EventsDim0Enum; use items_2::eventsdim0::EventsDim0NoPulse; +use items_2::eventsdim0enum::EventsDim0Enum; use items_2::eventsdim1::EventsDim1; use items_2::eventsdim1::EventsDim1NoPulse; use netpod::log::*; diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 8397cba..0d1d0ed 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -418,7 +418,7 @@ fn store_bins( .ts1s .iter() .zip(k.ts2s.iter()) - .zip(k.counts.iter()) + .zip(k.cnts.iter()) .zip(k.mins.iter()) .zip(k.maxs.iter()) .zip(k.avgs.iter())