Factor out unused finder routine

This commit is contained in:
Dominik Werder
2023-08-29 11:22:21 +02:00
parent 9733ce5d06
commit 321adda3f4
2 changed files with 137 additions and 130 deletions

View File

@@ -195,38 +195,6 @@ impl DaemonOpts {
}
}
struct OptFut<F> {
fut: Option<F>,
}
impl<F> OptFut<F> {
fn empty() -> Self {
Self { fut: None }
}
fn new(fut: F) -> Self {
Self { fut: Some(fut) }
}
fn is_enabled(&self) -> bool {
self.fut.is_some()
}
}
impl<F> futures_util::Future for OptFut<F>
where
F: futures_util::Future + std::marker::Unpin,
{
type Output = <F as futures_util::Future>::Output;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
match self.fut.as_mut() {
Some(fut) => fut.poll_unpin(cx),
None => std::task::Poll::Pending,
}
}
}
pub struct Daemon {
opts: DaemonOpts,
connection_states: BTreeMap<SocketAddrV4, CaConnState>,
@@ -391,104 +359,6 @@ impl Daemon {
Ok(ret)
}
#[allow(unused)]
fn start_finder_ca(
tx: Sender<DaemonEvent>,
tgts: Vec<SocketAddrV4>,
) -> (Sender<String>, tokio::task::JoinHandle<()>) {
let (qtx, qrx) = async_channel::bounded(32);
let (atx, arx) = async_channel::bounded(32);
let ioc_finder_fut = async move {
let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE);
let fut_tick_dur = Duration::from_millis(100);
let mut finder_more = true;
let mut finder_fut = OptFut::new(finder.next());
let mut qrx_fut = OptFut::new(qrx.recv());
let mut qrx_more = true;
let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
let mut asend = OptFut::empty();
loop {
tokio::select! {
_ = &mut asend, if asend.is_enabled() => {
asend = OptFut::empty();
}
r1 = &mut finder_fut, if finder_fut.is_enabled() => {
finder_fut = OptFut::empty();
match r1 {
Some(item) => {
asend = OptFut::new(atx.send(item));
}
None => {
// TODO finder has stopped, do no longer poll on it
warn!("Finder has stopped");
finder_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
r2 = &mut qrx_fut, if qrx_fut.is_enabled() => {
qrx_fut = OptFut::empty();
match r2 {
Ok(item) => {
finder.push(item);
}
Err(e) => {
// TODO input is done... ignore from here on.
error!("Finder input channel error {e}");
qrx_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
_ = &mut fut_tick => {
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
else => {
error!("all branches are disabled");
break;
}
};
}
};
let ioc_finder_jh = taskrun::spawn(ioc_finder_fut);
taskrun::spawn({
async move {
while let Ok(item) = arx.recv().await {
match tx.send(DaemonEvent::SearchDone(item)).await {
Ok(_) => {}
Err(e) => {
error!("search res fwd {e}");
}
}
}
warn!("search res fwd nput broken");
}
});
(qtx, ioc_finder_jh)
}
fn stats(&self) -> &Arc<DaemonStats> {
&self.stats
}

View File

@@ -1,4 +1,8 @@
use super::FINDER_BATCH_SIZE;
use super::FINDER_IN_FLIGHT_MAX;
use super::FINDER_TIMEOUT;
use crate::daemon::CURRENT_SEARCH_PENDING_MAX;
use crate::daemon::FINDER_JOB_QUEUE_LEN_MAX;
use crate::daemon::SEARCH_BATCH_MAX;
use crate::daemon::SEARCH_DB_PIPELINE_LEN;
use crate::daemon::SEARCH_REQ_BATCH_RECV_COUNT;
@@ -11,16 +15,21 @@ use async_channel::Sender;
use dbpg::conn::make_pg_client;
use dbpg::postgres::Row as PgRow;
use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
use netfetch::ca::findioc::FindIocRes;
use netfetch::ca::findioc::FindIocStream;
use netfetch::daemon_common::DaemonEvent;
use netpod::Database;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
#[allow(unused)]
macro_rules! debug_batch {
@@ -182,6 +191,7 @@ async fn finder_worker(
qrx,
);
for _ in 0..SEARCH_DB_PIPELINE_LEN {
// TODO use join handle
tokio::spawn(finder_worker_single(
batch_rx.clone(),
tx.clone(),
@@ -201,3 +211,130 @@ pub fn start_finder(
let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db));
(qtx, jh)
}
struct OptFut<F> {
fut: Option<F>,
}
impl<F> OptFut<F> {
fn empty() -> Self {
Self { fut: None }
}
fn new(fut: F) -> Self {
Self { fut: Some(fut) }
}
fn is_enabled(&self) -> bool {
self.fut.is_some()
}
}
impl<F> futures_util::Future for OptFut<F>
where
F: futures_util::Future + std::marker::Unpin,
{
type Output = <F as futures_util::Future>::Output;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
match self.fut.as_mut() {
Some(fut) => fut.poll_unpin(cx),
None => std::task::Poll::Pending,
}
}
}
#[allow(unused)]
fn start_finder_ca(tx: Sender<DaemonEvent>, tgts: Vec<SocketAddrV4>) -> (Sender<String>, JoinHandle<()>) {
let (qtx, qrx) = async_channel::bounded(32);
let (atx, arx) = async_channel::bounded(32);
let ioc_finder_fut = async move {
let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE);
let fut_tick_dur = Duration::from_millis(100);
let mut finder_more = true;
let mut finder_fut = OptFut::new(finder.next());
let mut qrx_fut = OptFut::new(qrx.recv());
let mut qrx_more = true;
let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
let mut asend = OptFut::empty();
loop {
tokio::select! {
_ = &mut asend, if asend.is_enabled() => {
asend = OptFut::empty();
}
r1 = &mut finder_fut, if finder_fut.is_enabled() => {
finder_fut = OptFut::empty();
match r1 {
Some(item) => {
asend = OptFut::new(atx.send(item));
}
None => {
// TODO finder has stopped, do no longer poll on it
warn!("Finder has stopped");
finder_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
r2 = &mut qrx_fut, if qrx_fut.is_enabled() => {
qrx_fut = OptFut::empty();
match r2 {
Ok(item) => {
finder.push(item);
}
Err(e) => {
// TODO input is done... ignore from here on.
error!("Finder input channel error {e}");
qrx_more = false;
}
}
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
_ = &mut fut_tick => {
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
qrx_fut = OptFut::new(qrx.recv());
}
if finder_more {
finder_fut = OptFut::new(finder.next());
} else {
finder_fut = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
}
else => {
error!("all branches are disabled");
break;
}
};
}
};
let ioc_finder_jh = taskrun::spawn(ioc_finder_fut);
taskrun::spawn({
async move {
while let Ok(item) = arx.recv().await {
match tx.send(DaemonEvent::SearchDone(item)).await {
Ok(_) => {}
Err(e) => {
error!("search res fwd {e}");
}
}
}
warn!("search res fwd nput broken");
}
});
(qtx, ioc_finder_jh)
}