Split up finder sub worker
This commit is contained in:
@@ -51,6 +51,7 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use taskrun::tokio;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
use tokio_postgres::Row as PgRow;
|
||||
use tracing::info_span;
|
||||
@@ -234,7 +235,7 @@ pub struct Daemon {
|
||||
rx: Receiver<DaemonEvent>,
|
||||
chan_check_next: Option<Channel>,
|
||||
search_tx: Sender<String>,
|
||||
ioc_finder_jh: tokio::task::JoinHandle<()>,
|
||||
ioc_finder_jh: JoinHandle<Result<(), Error>>,
|
||||
datastore: Arc<DataStore>,
|
||||
common_insert_item_queue: Arc<CommonInsertItemQueue>,
|
||||
insert_queue_counter: Arc<AtomicUsize>,
|
||||
@@ -245,7 +246,7 @@ pub struct Daemon {
|
||||
count_unassigned: usize,
|
||||
count_assigned: usize,
|
||||
last_status_print: SystemTime,
|
||||
insert_workers_jh: Vec<tokio::task::JoinHandle<()>>,
|
||||
insert_workers_jh: Vec<JoinHandle<()>>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
caconn_last_channel_check: Instant,
|
||||
stats: Arc<DaemonStats>,
|
||||
@@ -256,25 +257,12 @@ pub struct Daemon {
|
||||
|
||||
impl Daemon {
|
||||
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
|
||||
// let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?);
|
||||
let datastore = DataStore::new(&opts.scyconf)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
let datastore = Arc::new(datastore);
|
||||
let (tx, rx_daemon_ev) = async_channel::bounded(32);
|
||||
let pgcs = {
|
||||
let mut a = Vec::new();
|
||||
for _ in 0..SEARCH_DB_PIPELINE_LEN {
|
||||
let pgc = Arc::new(
|
||||
make_pg_client(&opts.pgconf)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?,
|
||||
);
|
||||
a.push(pgc);
|
||||
}
|
||||
a
|
||||
};
|
||||
let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), opts.backend().into(), pgcs);
|
||||
let (search_tx, ioc_finder_jh) = finder::start_finder(tx.clone(), opts.backend().into(), opts.pgconf.clone());
|
||||
|
||||
// TODO keep join handles and await later
|
||||
let (channel_info_query_tx, ..) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf)
|
||||
@@ -403,14 +391,6 @@ impl Daemon {
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn start_finder(
|
||||
tx: Sender<DaemonEvent>,
|
||||
backend: String,
|
||||
pgcs: Vec<Arc<PgClient>>,
|
||||
) -> (Sender<String>, tokio::task::JoinHandle<()>) {
|
||||
finder::start_finder(tx, backend, pgcs)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn start_finder_ca(
|
||||
tx: Sender<DaemonEvent>,
|
||||
|
||||
@@ -6,17 +6,18 @@ use crate::daemon::SEARCH_RES_0_COUNT;
|
||||
use crate::daemon::SEARCH_RES_1_COUNT;
|
||||
use crate::daemon::SEARCH_RES_2_COUNT;
|
||||
use crate::daemon::SEARCH_RES_3_COUNT;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use dbpg::conn::PgClient;
|
||||
use dbpg::conn::make_pg_client;
|
||||
use dbpg::postgres::Row as PgRow;
|
||||
use futures_util::StreamExt;
|
||||
use err::Error;
|
||||
use log::*;
|
||||
use netfetch::ca::findioc::FindIocRes;
|
||||
use netfetch::daemon_common::DaemonEvent;
|
||||
use netpod::Database;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::atomic;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use taskrun::tokio;
|
||||
@@ -37,153 +38,166 @@ macro_rules! trace_batch {
|
||||
});
|
||||
}
|
||||
|
||||
fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
|
||||
let mut ret = VecDeque::new();
|
||||
for row in rows {
|
||||
let ch: Result<String, _> = row.try_get(0);
|
||||
if let Ok(ch) = ch {
|
||||
if let Some(addr) = row.get::<_, Option<String>>(1) {
|
||||
let addr = addr.parse().map_or(None, |x| Some(x));
|
||||
let item = FindIocRes {
|
||||
channel: ch,
|
||||
response_addr: None,
|
||||
addr,
|
||||
dt: Duration::from_millis(0),
|
||||
};
|
||||
ret.push_back(item);
|
||||
} else {
|
||||
let item = FindIocRes {
|
||||
channel: ch,
|
||||
response_addr: None,
|
||||
addr: None,
|
||||
dt: Duration::from_millis(0),
|
||||
};
|
||||
ret.push_back(item);
|
||||
}
|
||||
} else if let Err(e) = ch {
|
||||
error!("bad string from pg: {e:?}");
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
async fn finder_worker_single(
|
||||
inp: Receiver<Vec<String>>,
|
||||
tx: Sender<DaemonEvent>,
|
||||
backend: String,
|
||||
db: Database,
|
||||
) -> Result<(), Error> {
|
||||
let pg = make_pg_client(&db)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
let sql = concat!(
|
||||
"with q1 as (select * from unnest($2::text[]) as unn (ch))",
|
||||
" select distinct on (tt.facility, tt.channel) tt.channel, tt.addr",
|
||||
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null",
|
||||
" order by tt.facility, tt.channel, tsmod desc",
|
||||
);
|
||||
let qu_select_multi = pg
|
||||
.prepare(sql)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
let mut resdiff = 0;
|
||||
loop {
|
||||
match inp.recv().await {
|
||||
Ok(batch) => {
|
||||
SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel);
|
||||
let ts1 = Instant::now();
|
||||
debug_batch!("run query batch len {}", batch.len());
|
||||
let qres = pg.query(&qu_select_multi, &[&backend, &batch]).await;
|
||||
let dt = ts1.elapsed();
|
||||
debug_batch!(
|
||||
"done query batch len {}: {} {:.3}ms",
|
||||
batch.len(),
|
||||
qres.is_ok(),
|
||||
dt.as_secs_f32() * 1e3
|
||||
);
|
||||
if dt > Duration::from_millis(5000) {
|
||||
let mut out = String::from("[");
|
||||
for s in &batch {
|
||||
if out.len() > 1 {
|
||||
out.push_str(", ");
|
||||
}
|
||||
out.push('\'');
|
||||
out.push_str(s);
|
||||
out.push('\'');
|
||||
}
|
||||
out.push(']');
|
||||
eprintln!("VERY SLOW QUERY\n{out}");
|
||||
}
|
||||
match qres {
|
||||
Ok(rows) => {
|
||||
if rows.len() > batch.len() {
|
||||
error!("MORE RESULTS THAN INPUT");
|
||||
} else if rows.len() < batch.len() {
|
||||
resdiff += batch.len() - rows.len();
|
||||
}
|
||||
let nbatch = batch.len();
|
||||
trace_batch!("received results {} resdiff {}", rows.len(), resdiff);
|
||||
SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel);
|
||||
let items = transform_pgres(rows);
|
||||
let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect();
|
||||
let mut to_add = Vec::new();
|
||||
for s in batch {
|
||||
if !names.contains_key(&s) {
|
||||
let item = FindIocRes {
|
||||
channel: s,
|
||||
response_addr: None,
|
||||
addr: None,
|
||||
dt: Duration::from_millis(0),
|
||||
};
|
||||
to_add.push(item);
|
||||
}
|
||||
}
|
||||
SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
|
||||
SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel);
|
||||
let mut items = items;
|
||||
items.extend(to_add.into_iter());
|
||||
if items.len() != nbatch {
|
||||
error!("STILL NOT MATCHING LEN");
|
||||
}
|
||||
SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
|
||||
let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await;
|
||||
match x {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("finder sees: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("finder sees error: {e}");
|
||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_e) => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finder_worker(
|
||||
qrx: Receiver<String>,
|
||||
tx: Sender<DaemonEvent>,
|
||||
backend: String,
|
||||
db: Database,
|
||||
) -> Result<(), Error> {
|
||||
// TODO do something with join handle
|
||||
let (batch_rx, _jh) = batchtools::batcher::batch(
|
||||
SEARCH_BATCH_MAX,
|
||||
Duration::from_millis(200),
|
||||
SEARCH_DB_PIPELINE_LEN,
|
||||
qrx,
|
||||
);
|
||||
for _ in 0..SEARCH_DB_PIPELINE_LEN {
|
||||
tokio::spawn(finder_worker_single(
|
||||
batch_rx.clone(),
|
||||
tx.clone(),
|
||||
backend.clone(),
|
||||
db.clone(),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn start_finder(
|
||||
tx: Sender<DaemonEvent>,
|
||||
backend: String,
|
||||
pgcs: Vec<Arc<PgClient>>,
|
||||
) -> (Sender<String>, tokio::task::JoinHandle<()>) {
|
||||
fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
|
||||
let mut ret = VecDeque::new();
|
||||
for row in rows {
|
||||
let ch: Result<String, _> = row.try_get(0);
|
||||
if let Ok(ch) = ch {
|
||||
if let Some(addr) = row.get::<_, Option<String>>(1) {
|
||||
let addr = addr.parse().map_or(None, |x| Some(x));
|
||||
let item = FindIocRes {
|
||||
channel: ch,
|
||||
response_addr: None,
|
||||
addr,
|
||||
dt: Duration::from_millis(0),
|
||||
};
|
||||
ret.push_back(item);
|
||||
} else {
|
||||
let item = FindIocRes {
|
||||
channel: ch,
|
||||
response_addr: None,
|
||||
addr: None,
|
||||
dt: Duration::from_millis(0),
|
||||
};
|
||||
ret.push_back(item);
|
||||
}
|
||||
} else if let Err(e) = ch {
|
||||
error!("bad string from pg: {e:?}");
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
db: Database,
|
||||
) -> (Sender<String>, tokio::task::JoinHandle<Result<(), Error>>) {
|
||||
let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX);
|
||||
let fut = async move {
|
||||
let (batch_rx, _jh) = batchtools::batcher::batch(
|
||||
SEARCH_BATCH_MAX,
|
||||
Duration::from_millis(200),
|
||||
SEARCH_DB_PIPELINE_LEN,
|
||||
qrx,
|
||||
);
|
||||
let (pgc_tx, pgc_rx) = async_channel::bounded(128);
|
||||
for pgc in pgcs {
|
||||
let sql = concat!(
|
||||
"with q1 as (select * from unnest($2::text[]) as unn (ch))",
|
||||
" select distinct on (tt.facility, tt.channel) tt.channel, tt.addr",
|
||||
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null",
|
||||
" order by tt.facility, tt.channel, tsmod desc",
|
||||
);
|
||||
let qu_select_multi = pgc.prepare(sql).await.unwrap();
|
||||
let qu_select_multi = Arc::new(qu_select_multi);
|
||||
match pgc_tx.send((pgc, qu_select_multi)).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("can not enqueue pgc {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let backend = Arc::new(backend.clone());
|
||||
let stream = batch_rx
|
||||
.map(|batch: Vec<String>| {
|
||||
let pgc_tx = pgc_tx.clone();
|
||||
let pgc_rx = pgc_rx.clone();
|
||||
let backend = backend.clone();
|
||||
SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel);
|
||||
async move {
|
||||
let ts1 = Instant::now();
|
||||
let (pgc, qu_select_multi) = pgc_rx.recv().await.unwrap();
|
||||
debug_batch!("run query batch len {}", batch.len());
|
||||
let qres = pgc.query(qu_select_multi.as_ref(), &[backend.as_ref(), &batch]).await;
|
||||
let dt = ts1.elapsed();
|
||||
debug_batch!(
|
||||
"done query batch len {}: {} {:.3}ms",
|
||||
batch.len(),
|
||||
qres.is_ok(),
|
||||
dt.as_secs_f32() * 1e3
|
||||
);
|
||||
if dt > Duration::from_millis(5000) {
|
||||
let mut out = String::from("[");
|
||||
for s in &batch {
|
||||
if out.len() > 1 {
|
||||
out.push_str(", ");
|
||||
}
|
||||
out.push('\'');
|
||||
out.push_str(s);
|
||||
out.push('\'');
|
||||
}
|
||||
out.push(']');
|
||||
eprintln!("VERY SLOW QUERY\n{out}");
|
||||
}
|
||||
pgc_tx.send((pgc, qu_select_multi)).await.unwrap();
|
||||
(batch, qres)
|
||||
}
|
||||
})
|
||||
.buffer_unordered(SEARCH_DB_PIPELINE_LEN);
|
||||
let mut resdiff = 0;
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some((batch, pgres)) = stream.next().await {
|
||||
match pgres {
|
||||
Ok(rows) => {
|
||||
if rows.len() > batch.len() {
|
||||
error!("MORE RESULTS THAN INPUT");
|
||||
} else if rows.len() < batch.len() {
|
||||
resdiff += batch.len() - rows.len();
|
||||
}
|
||||
let nbatch = batch.len();
|
||||
trace_batch!("received results {} resdiff {}", rows.len(), resdiff);
|
||||
SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel);
|
||||
let items = transform_pgres(rows);
|
||||
let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect();
|
||||
let mut to_add = Vec::new();
|
||||
for s in batch {
|
||||
if !names.contains_key(&s) {
|
||||
let item = FindIocRes {
|
||||
channel: s,
|
||||
response_addr: None,
|
||||
addr: None,
|
||||
dt: Duration::from_millis(0),
|
||||
};
|
||||
to_add.push(item);
|
||||
}
|
||||
}
|
||||
SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
|
||||
SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel);
|
||||
let mut items = items;
|
||||
items.extend(to_add.into_iter());
|
||||
if items.len() != nbatch {
|
||||
error!("STILL NOT MATCHING LEN");
|
||||
}
|
||||
SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
|
||||
let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await;
|
||||
match x {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("finder sees: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("finder sees error: {e}");
|
||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let jh = taskrun::spawn(fut);
|
||||
let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db));
|
||||
(qtx, jh)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user