diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 3cef69d..56332b7 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -51,6 +51,7 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; +use tokio::task::JoinHandle; use tokio_postgres::Client as PgClient; use tokio_postgres::Row as PgRow; use tracing::info_span; @@ -234,7 +235,7 @@ pub struct Daemon { rx: Receiver, chan_check_next: Option, search_tx: Sender, - ioc_finder_jh: tokio::task::JoinHandle<()>, + ioc_finder_jh: JoinHandle>, datastore: Arc, common_insert_item_queue: Arc, insert_queue_counter: Arc, @@ -245,7 +246,7 @@ pub struct Daemon { count_unassigned: usize, count_assigned: usize, last_status_print: SystemTime, - insert_workers_jh: Vec>, + insert_workers_jh: Vec>, ingest_commons: Arc, caconn_last_channel_check: Instant, stats: Arc, @@ -256,25 +257,12 @@ pub struct Daemon { impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { - // let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?); let datastore = DataStore::new(&opts.scyconf) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let datastore = Arc::new(datastore); let (tx, rx_daemon_ev) = async_channel::bounded(32); - let pgcs = { - let mut a = Vec::new(); - for _ in 0..SEARCH_DB_PIPELINE_LEN { - let pgc = Arc::new( - make_pg_client(&opts.pgconf) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?, - ); - a.push(pgc); - } - a - }; - let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), opts.backend().into(), pgcs); + let (search_tx, ioc_finder_jh) = finder::start_finder(tx.clone(), opts.backend().into(), opts.pgconf.clone()); // TODO keep join handles and await later let (channel_info_query_tx, ..) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf) @@ -403,14 +391,6 @@ impl Daemon { Ok(ret) } - fn start_finder( - tx: Sender, - backend: String, - pgcs: Vec>, - ) -> (Sender, tokio::task::JoinHandle<()>) { - finder::start_finder(tx, backend, pgcs) - } - #[allow(unused)] fn start_finder_ca( tx: Sender, diff --git a/daqingest/src/daemon/finder.rs b/daqingest/src/daemon/finder.rs index 132faca..d336913 100644 --- a/daqingest/src/daemon/finder.rs +++ b/daqingest/src/daemon/finder.rs @@ -6,17 +6,18 @@ use crate::daemon::SEARCH_RES_0_COUNT; use crate::daemon::SEARCH_RES_1_COUNT; use crate::daemon::SEARCH_RES_2_COUNT; use crate::daemon::SEARCH_RES_3_COUNT; +use async_channel::Receiver; use async_channel::Sender; -use dbpg::conn::PgClient; +use dbpg::conn::make_pg_client; use dbpg::postgres::Row as PgRow; -use futures_util::StreamExt; +use err::Error; use log::*; use netfetch::ca::findioc::FindIocRes; use netfetch::daemon_common::DaemonEvent; +use netpod::Database; use std::collections::HashMap; use std::collections::VecDeque; use std::sync::atomic; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; use taskrun::tokio; @@ -37,153 +38,166 @@ macro_rules! trace_batch { }); } +fn transform_pgres(rows: Vec) -> VecDeque { + let mut ret = VecDeque::new(); + for row in rows { + let ch: Result = row.try_get(0); + if let Ok(ch) = ch { + if let Some(addr) = row.get::<_, Option>(1) { + let addr = addr.parse().map_or(None, |x| Some(x)); + let item = FindIocRes { + channel: ch, + response_addr: None, + addr, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } else { + let item = FindIocRes { + channel: ch, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } + } else if let Err(e) = ch { + error!("bad string from pg: {e:?}"); + } + } + ret +} + +async fn finder_worker_single( + inp: Receiver>, + tx: Sender, + backend: String, + db: Database, +) -> Result<(), Error> { + let pg = make_pg_client(&db) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let sql = concat!( + "with q1 as (select * from unnest($2::text[]) as unn (ch))", + " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", + " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", + " order by tt.facility, tt.channel, tsmod desc", + ); + let qu_select_multi = pg + .prepare(sql) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let mut resdiff = 0; + loop { + match inp.recv().await { + Ok(batch) => { + SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); + let ts1 = Instant::now(); + debug_batch!("run query batch len {}", batch.len()); + let qres = pg.query(&qu_select_multi, &[&backend, &batch]).await; + let dt = ts1.elapsed(); + debug_batch!( + "done query batch len {}: {} {:.3}ms", + batch.len(), + qres.is_ok(), + dt.as_secs_f32() * 1e3 + ); + if dt > Duration::from_millis(5000) { + let mut out = String::from("["); + for s in &batch { + if out.len() > 1 { + out.push_str(", "); + } + out.push('\''); + out.push_str(s); + out.push('\''); + } + out.push(']'); + eprintln!("VERY SLOW QUERY\n{out}"); + } + match qres { + Ok(rows) => { + if rows.len() > batch.len() { + error!("MORE RESULTS THAN INPUT"); + } else if rows.len() < batch.len() { + resdiff += batch.len() - rows.len(); + } + let nbatch = batch.len(); + trace_batch!("received results {} resdiff {}", rows.len(), resdiff); + SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); + let items = transform_pgres(rows); + let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); + let mut to_add = Vec::new(); + for s in batch { + if !names.contains_key(&s) { + let item = FindIocRes { + channel: s, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + to_add.push(item); + } + } + SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); + let mut items = items; + items.extend(to_add.into_iter()); + if items.len() != nbatch { + error!("STILL NOT MATCHING LEN"); + } + SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await; + match x { + Ok(_) => {} + Err(e) => { + error!("finder sees: {e}"); + break; + } + } + } + Err(e) => { + error!("finder sees error: {e}"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + } + Err(_e) => break, + } + } + Ok(()) +} + +async fn finder_worker( + qrx: Receiver, + tx: Sender, + backend: String, + db: Database, +) -> Result<(), Error> { + // TODO do something with join handle + let (batch_rx, _jh) = batchtools::batcher::batch( + SEARCH_BATCH_MAX, + Duration::from_millis(200), + SEARCH_DB_PIPELINE_LEN, + qrx, + ); + for _ in 0..SEARCH_DB_PIPELINE_LEN { + tokio::spawn(finder_worker_single( + batch_rx.clone(), + tx.clone(), + backend.clone(), + db.clone(), + )); + } + Ok(()) +} + pub fn start_finder( tx: Sender, backend: String, - pgcs: Vec>, -) -> (Sender, tokio::task::JoinHandle<()>) { - fn transform_pgres(rows: Vec) -> VecDeque { - let mut ret = VecDeque::new(); - for row in rows { - let ch: Result = row.try_get(0); - if let Ok(ch) = ch { - if let Some(addr) = row.get::<_, Option>(1) { - let addr = addr.parse().map_or(None, |x| Some(x)); - let item = FindIocRes { - channel: ch, - response_addr: None, - addr, - dt: Duration::from_millis(0), - }; - ret.push_back(item); - } else { - let item = FindIocRes { - channel: ch, - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - ret.push_back(item); - } - } else if let Err(e) = ch { - error!("bad string from pg: {e:?}"); - } - } - ret - } + db: Database, +) -> (Sender, tokio::task::JoinHandle>) { let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); - let fut = async move { - let (batch_rx, _jh) = batchtools::batcher::batch( - SEARCH_BATCH_MAX, - Duration::from_millis(200), - SEARCH_DB_PIPELINE_LEN, - qrx, - ); - let (pgc_tx, pgc_rx) = async_channel::bounded(128); - for pgc in pgcs { - let sql = concat!( - "with q1 as (select * from unnest($2::text[]) as unn (ch))", - " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", - " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", - " order by tt.facility, tt.channel, tsmod desc", - ); - let qu_select_multi = pgc.prepare(sql).await.unwrap(); - let qu_select_multi = Arc::new(qu_select_multi); - match pgc_tx.send((pgc, qu_select_multi)).await { - Ok(_) => {} - Err(e) => { - error!("can not enqueue pgc {e}"); - } - } - } - let backend = Arc::new(backend.clone()); - let stream = batch_rx - .map(|batch: Vec| { - let pgc_tx = pgc_tx.clone(); - let pgc_rx = pgc_rx.clone(); - let backend = backend.clone(); - SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); - async move { - let ts1 = Instant::now(); - let (pgc, qu_select_multi) = pgc_rx.recv().await.unwrap(); - debug_batch!("run query batch len {}", batch.len()); - let qres = pgc.query(qu_select_multi.as_ref(), &[backend.as_ref(), &batch]).await; - let dt = ts1.elapsed(); - debug_batch!( - "done query batch len {}: {} {:.3}ms", - batch.len(), - qres.is_ok(), - dt.as_secs_f32() * 1e3 - ); - if dt > Duration::from_millis(5000) { - let mut out = String::from("["); - for s in &batch { - if out.len() > 1 { - out.push_str(", "); - } - out.push('\''); - out.push_str(s); - out.push('\''); - } - out.push(']'); - eprintln!("VERY SLOW QUERY\n{out}"); - } - pgc_tx.send((pgc, qu_select_multi)).await.unwrap(); - (batch, qres) - } - }) - .buffer_unordered(SEARCH_DB_PIPELINE_LEN); - let mut resdiff = 0; - let mut stream = Box::pin(stream); - while let Some((batch, pgres)) = stream.next().await { - match pgres { - Ok(rows) => { - if rows.len() > batch.len() { - error!("MORE RESULTS THAN INPUT"); - } else if rows.len() < batch.len() { - resdiff += batch.len() - rows.len(); - } - let nbatch = batch.len(); - trace_batch!("received results {} resdiff {}", rows.len(), resdiff); - SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); - let items = transform_pgres(rows); - let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); - let mut to_add = Vec::new(); - for s in batch { - if !names.contains_key(&s) { - let item = FindIocRes { - channel: s, - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - to_add.push(item); - } - } - SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); - let mut items = items; - items.extend(to_add.into_iter()); - if items.len() != nbatch { - error!("STILL NOT MATCHING LEN"); - } - SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await; - match x { - Ok(_) => {} - Err(e) => { - error!("finder sees: {e}"); - break; - } - } - } - Err(e) => { - error!("finder sees error: {e}"); - tokio::time::sleep(Duration::from_millis(1000)).await; - } - } - } - }; - let jh = taskrun::spawn(fut); + let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db)); (qtx, jh) }