From 321adda3f41f86929689d70360caf604a0893dd7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 29 Aug 2023 11:22:21 +0200 Subject: [PATCH] Factor out unused finder routine --- daqingest/src/daemon.rs | 130 ------------------------------- daqingest/src/daemon/finder.rs | 137 +++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 130 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 56332b7..235333d 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -195,38 +195,6 @@ impl DaemonOpts { } } -struct OptFut { - fut: Option, -} - -impl OptFut { - fn empty() -> Self { - Self { fut: None } - } - - fn new(fut: F) -> Self { - Self { fut: Some(fut) } - } - - fn is_enabled(&self) -> bool { - self.fut.is_some() - } -} - -impl futures_util::Future for OptFut -where - F: futures_util::Future + std::marker::Unpin, -{ - type Output = ::Output; - - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { - match self.fut.as_mut() { - Some(fut) => fut.poll_unpin(cx), - None => std::task::Poll::Pending, - } - } -} - pub struct Daemon { opts: DaemonOpts, connection_states: BTreeMap, @@ -391,104 +359,6 @@ impl Daemon { Ok(ret) } - #[allow(unused)] - fn start_finder_ca( - tx: Sender, - tgts: Vec, - ) -> (Sender, tokio::task::JoinHandle<()>) { - let (qtx, qrx) = async_channel::bounded(32); - let (atx, arx) = async_channel::bounded(32); - let ioc_finder_fut = async move { - let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE); - let fut_tick_dur = Duration::from_millis(100); - let mut finder_more = true; - let mut finder_fut = OptFut::new(finder.next()); - let mut qrx_fut = OptFut::new(qrx.recv()); - let mut qrx_more = true; - let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - let mut asend = OptFut::empty(); - loop { - tokio::select! { - _ = &mut asend, if asend.is_enabled() => { - asend = OptFut::empty(); - } - r1 = &mut finder_fut, if finder_fut.is_enabled() => { - finder_fut = OptFut::empty(); - match r1 { - Some(item) => { - asend = OptFut::new(atx.send(item)); - } - None => { - // TODO finder has stopped, do no longer poll on it - warn!("Finder has stopped"); - finder_more = false; - } - } - if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - qrx_fut = OptFut::new(qrx.recv()); - } - if finder_more { - finder_fut = OptFut::new(finder.next()); - } - fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - } - r2 = &mut qrx_fut, if qrx_fut.is_enabled() => { - qrx_fut = OptFut::empty(); - match r2 { - Ok(item) => { - finder.push(item); - } - Err(e) => { - // TODO input is done... ignore from here on. - error!("Finder input channel error {e}"); - qrx_more = false; - } - } - if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - qrx_fut = OptFut::new(qrx.recv()); - } - if finder_more { - finder_fut = OptFut::new(finder.next()); - } else { - finder_fut = OptFut::empty(); - } - fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - } - _ = &mut fut_tick => { - if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - qrx_fut = OptFut::new(qrx.recv()); - } - if finder_more { - finder_fut = OptFut::new(finder.next()); - } else { - finder_fut = OptFut::empty(); - } - fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - } - else => { - error!("all branches are disabled"); - break; - } - }; - } - }; - let ioc_finder_jh = taskrun::spawn(ioc_finder_fut); - taskrun::spawn({ - async move { - while let Ok(item) = arx.recv().await { - match tx.send(DaemonEvent::SearchDone(item)).await { - Ok(_) => {} - Err(e) => { - error!("search res fwd {e}"); - } - } - } - warn!("search res fwd nput broken"); - } - }); - (qtx, ioc_finder_jh) - } - fn stats(&self) -> &Arc { &self.stats } diff --git a/daqingest/src/daemon/finder.rs b/daqingest/src/daemon/finder.rs index d336913..7fec502 100644 --- a/daqingest/src/daemon/finder.rs +++ b/daqingest/src/daemon/finder.rs @@ -1,4 +1,8 @@ +use super::FINDER_BATCH_SIZE; +use super::FINDER_IN_FLIGHT_MAX; +use super::FINDER_TIMEOUT; use crate::daemon::CURRENT_SEARCH_PENDING_MAX; +use crate::daemon::FINDER_JOB_QUEUE_LEN_MAX; use crate::daemon::SEARCH_BATCH_MAX; use crate::daemon::SEARCH_DB_PIPELINE_LEN; use crate::daemon::SEARCH_REQ_BATCH_RECV_COUNT; @@ -11,16 +15,21 @@ use async_channel::Sender; use dbpg::conn::make_pg_client; use dbpg::postgres::Row as PgRow; use err::Error; +use futures_util::FutureExt; +use futures_util::StreamExt; use log::*; use netfetch::ca::findioc::FindIocRes; +use netfetch::ca::findioc::FindIocStream; use netfetch::daemon_common::DaemonEvent; use netpod::Database; use std::collections::HashMap; use std::collections::VecDeque; +use std::net::SocketAddrV4; use std::sync::atomic; use std::time::Duration; use std::time::Instant; use taskrun::tokio; +use tokio::task::JoinHandle; #[allow(unused)] macro_rules! debug_batch { @@ -182,6 +191,7 @@ async fn finder_worker( qrx, ); for _ in 0..SEARCH_DB_PIPELINE_LEN { + // TODO use join handle tokio::spawn(finder_worker_single( batch_rx.clone(), tx.clone(), @@ -201,3 +211,130 @@ pub fn start_finder( let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db)); (qtx, jh) } + +struct OptFut { + fut: Option, +} + +impl OptFut { + fn empty() -> Self { + Self { fut: None } + } + + fn new(fut: F) -> Self { + Self { fut: Some(fut) } + } + + fn is_enabled(&self) -> bool { + self.fut.is_some() + } +} + +impl futures_util::Future for OptFut +where + F: futures_util::Future + std::marker::Unpin, +{ + type Output = ::Output; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { + match self.fut.as_mut() { + Some(fut) => fut.poll_unpin(cx), + None => std::task::Poll::Pending, + } + } +} + +#[allow(unused)] +fn start_finder_ca(tx: Sender, tgts: Vec) -> (Sender, JoinHandle<()>) { + let (qtx, qrx) = async_channel::bounded(32); + let (atx, arx) = async_channel::bounded(32); + let ioc_finder_fut = async move { + let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE); + let fut_tick_dur = Duration::from_millis(100); + let mut finder_more = true; + let mut finder_fut = OptFut::new(finder.next()); + let mut qrx_fut = OptFut::new(qrx.recv()); + let mut qrx_more = true; + let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + let mut asend = OptFut::empty(); + loop { + tokio::select! { + _ = &mut asend, if asend.is_enabled() => { + asend = OptFut::empty(); + } + r1 = &mut finder_fut, if finder_fut.is_enabled() => { + finder_fut = OptFut::empty(); + match r1 { + Some(item) => { + asend = OptFut::new(atx.send(item)); + } + None => { + // TODO finder has stopped, do no longer poll on it + warn!("Finder has stopped"); + finder_more = false; + } + } + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + qrx_fut = OptFut::new(qrx.recv()); + } + if finder_more { + finder_fut = OptFut::new(finder.next()); + } + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + } + r2 = &mut qrx_fut, if qrx_fut.is_enabled() => { + qrx_fut = OptFut::empty(); + match r2 { + Ok(item) => { + finder.push(item); + } + Err(e) => { + // TODO input is done... ignore from here on. + error!("Finder input channel error {e}"); + qrx_more = false; + } + } + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + qrx_fut = OptFut::new(qrx.recv()); + } + if finder_more { + finder_fut = OptFut::new(finder.next()); + } else { + finder_fut = OptFut::empty(); + } + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + } + _ = &mut fut_tick => { + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + qrx_fut = OptFut::new(qrx.recv()); + } + if finder_more { + finder_fut = OptFut::new(finder.next()); + } else { + finder_fut = OptFut::empty(); + } + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + } + else => { + error!("all branches are disabled"); + break; + } + }; + } + }; + let ioc_finder_jh = taskrun::spawn(ioc_finder_fut); + taskrun::spawn({ + async move { + while let Ok(item) = arx.recv().await { + match tx.send(DaemonEvent::SearchDone(item)).await { + Ok(_) => {} + Err(e) => { + error!("search res fwd {e}"); + } + } + } + warn!("search res fwd nput broken"); + } + }); + (qtx, ioc_finder_jh) +}