diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 0630c6f..3cef69d 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1,3 +1,4 @@ +pub mod finder; pub mod inserthook; pub mod types; @@ -88,22 +89,6 @@ static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0); static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0); static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0); -#[allow(unused)] -macro_rules! debug_batch { - // (D$($arg:tt)*) => (); - ($($arg:tt)*) => (if false { - info!($($arg)*); - }); -} - -#[allow(unused)] -macro_rules! trace_batch { - // (D$($arg:tt)*) => (); - ($($arg:tt)*) => (if false { - trace!($($arg)*); - }); -} - #[derive(Clone, Debug, Serialize)] pub enum ConnectionStateValue { Unconnected, @@ -423,150 +408,7 @@ impl Daemon { backend: String, pgcs: Vec>, ) -> (Sender, tokio::task::JoinHandle<()>) { - fn transform_pgres(rows: Vec) -> VecDeque { - let mut ret = VecDeque::new(); - for row in rows { - let ch: Result = row.try_get(0); - if let Ok(ch) = ch { - if let Some(addr) = row.get::<_, Option>(1) { - let addr = addr.parse().map_or(None, |x| Some(x)); - let item = FindIocRes { - channel: ch, - response_addr: None, - addr, - dt: Duration::from_millis(0), - }; - ret.push_back(item); - } else { - let item = FindIocRes { - channel: ch, - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - ret.push_back(item); - } - } else if let Err(e) = ch { - error!("bad string from pg: {e:?}"); - } - } - ret - } - let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); - let fut = async move { - let (batch_rx, _jh) = batchtools::batcher::batch( - SEARCH_BATCH_MAX, - Duration::from_millis(200), - SEARCH_DB_PIPELINE_LEN, - qrx, - ); - let (pgc_tx, pgc_rx) = async_channel::bounded(128); - for pgc in pgcs { - let sql = concat!( - "with q1 as (select * from unnest($2::text[]) as unn (ch))", - " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", - " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", - " order by tt.facility, tt.channel, tsmod desc", - ); - let qu_select_multi = pgc.prepare(sql).await.unwrap(); - let qu_select_multi = Arc::new(qu_select_multi); - match pgc_tx.send((pgc, qu_select_multi)).await { - Ok(_) => {} - Err(e) => { - error!("can not enqueue pgc {e}"); - } - } - } - let backend = Arc::new(backend.clone()); - let stream = batch_rx - .map(|batch: Vec| { - let pgc_tx = pgc_tx.clone(); - let pgc_rx = pgc_rx.clone(); - let backend = backend.clone(); - SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); - async move { - let ts1 = Instant::now(); - let (pgc, qu_select_multi) = pgc_rx.recv().await.unwrap(); - debug_batch!("run query batch len {}", batch.len()); - let qres = pgc.query(qu_select_multi.as_ref(), &[backend.as_ref(), &batch]).await; - let dt = ts1.elapsed(); - debug_batch!( - "done query batch len {}: {} {:.3}ms", - batch.len(), - qres.is_ok(), - dt.as_secs_f32() * 1e3 - ); - if dt > Duration::from_millis(5000) { - let mut out = String::from("["); - for s in &batch { - if out.len() > 1 { - out.push_str(", "); - } - out.push('\''); - out.push_str(s); - out.push('\''); - } - out.push(']'); - eprintln!("VERY SLOW QUERY\n{out}"); - } - pgc_tx.send((pgc, qu_select_multi)).await.unwrap(); - (batch, qres) - } - }) - .buffer_unordered(SEARCH_DB_PIPELINE_LEN); - let mut resdiff = 0; - let mut stream = Box::pin(stream); - while let Some((batch, pgres)) = stream.next().await { - match pgres { - Ok(rows) => { - if rows.len() > batch.len() { - error!("MORE RESULTS THAN INPUT"); - } else if rows.len() < batch.len() { - resdiff += batch.len() - rows.len(); - } - let nbatch = batch.len(); - trace_batch!("received results {} resdiff {}", rows.len(), resdiff); - SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); - let items = transform_pgres(rows); - let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); - let mut to_add = Vec::new(); - for s in batch { - if !names.contains_key(&s) { - let item = FindIocRes { - channel: s, - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - to_add.push(item); - } - } - SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); - let mut items = items; - items.extend(to_add.into_iter()); - if items.len() != nbatch { - error!("STILL NOT MATCHING LEN"); - } - SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await; - match x { - Ok(_) => {} - Err(e) => { - error!("finder sees: {e}"); - break; - } - } - } - Err(e) => { - error!("finder sees error: {e}"); - tokio::time::sleep(Duration::from_millis(1000)).await; - } - } - } - }; - let jh = taskrun::spawn(fut); - (qtx, jh) + finder::start_finder(tx, backend, pgcs) } #[allow(unused)] diff --git a/daqingest/src/daemon/finder.rs b/daqingest/src/daemon/finder.rs new file mode 100644 index 0000000..132faca --- /dev/null +++ b/daqingest/src/daemon/finder.rs @@ -0,0 +1,189 @@ +use crate::daemon::CURRENT_SEARCH_PENDING_MAX; +use crate::daemon::SEARCH_BATCH_MAX; +use crate::daemon::SEARCH_DB_PIPELINE_LEN; +use crate::daemon::SEARCH_REQ_BATCH_RECV_COUNT; +use crate::daemon::SEARCH_RES_0_COUNT; +use crate::daemon::SEARCH_RES_1_COUNT; +use crate::daemon::SEARCH_RES_2_COUNT; +use crate::daemon::SEARCH_RES_3_COUNT; +use async_channel::Sender; +use dbpg::conn::PgClient; +use dbpg::postgres::Row as PgRow; +use futures_util::StreamExt; +use log::*; +use netfetch::ca::findioc::FindIocRes; +use netfetch::daemon_common::DaemonEvent; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::sync::atomic; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use taskrun::tokio; + +#[allow(unused)] +macro_rules! debug_batch { + // (D$($arg:tt)*) => (); + ($($arg:tt)*) => (if false { + debug!($($arg)*); + }); +} + +#[allow(unused)] +macro_rules! trace_batch { + // (D$($arg:tt)*) => (); + ($($arg:tt)*) => (if false { + trace!($($arg)*); + }); +} + +pub fn start_finder( + tx: Sender, + backend: String, + pgcs: Vec>, +) -> (Sender, tokio::task::JoinHandle<()>) { + fn transform_pgres(rows: Vec) -> VecDeque { + let mut ret = VecDeque::new(); + for row in rows { + let ch: Result = row.try_get(0); + if let Ok(ch) = ch { + if let Some(addr) = row.get::<_, Option>(1) { + let addr = addr.parse().map_or(None, |x| Some(x)); + let item = FindIocRes { + channel: ch, + response_addr: None, + addr, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } else { + let item = FindIocRes { + channel: ch, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } + } else if let Err(e) = ch { + error!("bad string from pg: {e:?}"); + } + } + ret + } + let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); + let fut = async move { + let (batch_rx, _jh) = batchtools::batcher::batch( + SEARCH_BATCH_MAX, + Duration::from_millis(200), + SEARCH_DB_PIPELINE_LEN, + qrx, + ); + let (pgc_tx, pgc_rx) = async_channel::bounded(128); + for pgc in pgcs { + let sql = concat!( + "with q1 as (select * from unnest($2::text[]) as unn (ch))", + " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", + " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", + " order by tt.facility, tt.channel, tsmod desc", + ); + let qu_select_multi = pgc.prepare(sql).await.unwrap(); + let qu_select_multi = Arc::new(qu_select_multi); + match pgc_tx.send((pgc, qu_select_multi)).await { + Ok(_) => {} + Err(e) => { + error!("can not enqueue pgc {e}"); + } + } + } + let backend = Arc::new(backend.clone()); + let stream = batch_rx + .map(|batch: Vec| { + let pgc_tx = pgc_tx.clone(); + let pgc_rx = pgc_rx.clone(); + let backend = backend.clone(); + SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); + async move { + let ts1 = Instant::now(); + let (pgc, qu_select_multi) = pgc_rx.recv().await.unwrap(); + debug_batch!("run query batch len {}", batch.len()); + let qres = pgc.query(qu_select_multi.as_ref(), &[backend.as_ref(), &batch]).await; + let dt = ts1.elapsed(); + debug_batch!( + "done query batch len {}: {} {:.3}ms", + batch.len(), + qres.is_ok(), + dt.as_secs_f32() * 1e3 + ); + if dt > Duration::from_millis(5000) { + let mut out = String::from("["); + for s in &batch { + if out.len() > 1 { + out.push_str(", "); + } + out.push('\''); + out.push_str(s); + out.push('\''); + } + out.push(']'); + eprintln!("VERY SLOW QUERY\n{out}"); + } + pgc_tx.send((pgc, qu_select_multi)).await.unwrap(); + (batch, qres) + } + }) + .buffer_unordered(SEARCH_DB_PIPELINE_LEN); + let mut resdiff = 0; + let mut stream = Box::pin(stream); + while let Some((batch, pgres)) = stream.next().await { + match pgres { + Ok(rows) => { + if rows.len() > batch.len() { + error!("MORE RESULTS THAN INPUT"); + } else if rows.len() < batch.len() { + resdiff += batch.len() - rows.len(); + } + let nbatch = batch.len(); + trace_batch!("received results {} resdiff {}", rows.len(), resdiff); + SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); + let items = transform_pgres(rows); + let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); + let mut to_add = Vec::new(); + for s in batch { + if !names.contains_key(&s) { + let item = FindIocRes { + channel: s, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + to_add.push(item); + } + } + SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); + let mut items = items; + items.extend(to_add.into_iter()); + if items.len() != nbatch { + error!("STILL NOT MATCHING LEN"); + } + SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await; + match x { + Ok(_) => {} + Err(e) => { + error!("finder sees: {e}"); + break; + } + } + } + Err(e) => { + error!("finder sees error: {e}"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + } + }; + let jh = taskrun::spawn(fut); + (qtx, jh) +} diff --git a/dbpg/src/lib.rs b/dbpg/src/lib.rs index 22dd88b..1f5fe7c 100644 --- a/dbpg/src/lib.rs +++ b/dbpg/src/lib.rs @@ -6,3 +6,5 @@ pub mod pool; pub mod schema; pub mod seriesbychannel; pub mod seriesid; + +pub use tokio_postgres as postgres;