WIP timeout handling

This commit is contained in:
Dominik Werder
2023-01-04 19:54:27 +01:00
parent 99357437c9
commit 4ea83f7a1f
6 changed files with 368 additions and 153 deletions

View File

@@ -20,6 +20,7 @@ async-channel = "1.6"
chrono = "0.4"
bytes = "1.1"
scylla = "0.7"
tokio-postgres = "0.7.7"
serde = { version = "1.0", features = ["derive"] }
err = { path = "../../daqbuffer/err" }
log = { path = "../log" }

View File

@@ -1,6 +1,7 @@
use async_channel::Receiver;
use async_channel::Sender;
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
@@ -8,16 +9,24 @@ use netfetch::ca::conn::CaConn;
use netfetch::ca::conn::ConnCommand;
use netfetch::ca::findioc::FindIocRes;
use netfetch::ca::findioc::FindIocStream;
use netfetch::ca::store::DataStore;
use netfetch::conf::CaIngestOpts;
use netfetch::errconv::ErrConv;
use netfetch::store::CommonInsertItemQueue;
use netpod::Database;
use netpod::ScyllaConfig;
use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::IpAddr;
use std::fmt;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use tokio_postgres::Client as PgClient;
const CHECK_CHANS_PER_TICK: usize = 10;
const CHECK_CHANS_PER_TICK: usize = 50000;
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)]
pub struct Channel {
@@ -65,12 +74,32 @@ pub enum ActiveChannelState {
NoAddress,
}
#[derive(Clone, Debug, Serialize)]
pub enum ChannelState {
enum ChanOp {
Finder(String, SystemTime),
ConnCmd(Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>),
}
impl fmt::Debug for ChanOp {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Finder(arg0, arg1) => fmt.debug_tuple("Finder").field(arg0).field(arg1).finish(),
Self::ConnCmd(_arg0) => fmt.debug_tuple("ConnCmd").finish(),
}
}
}
#[derive(Debug, Serialize)]
pub enum ChannelStateValue {
Active(ActiveChannelState),
ToRemove { addr: Option<SocketAddrV4> },
}
#[derive(Debug)]
pub struct ChannelState {
value: ChannelStateValue,
pending_op: Option<ChanOp>,
}
#[derive(Debug)]
pub enum DaemonEvent {
TimerTick,
@@ -80,8 +109,20 @@ pub enum DaemonEvent {
}
pub struct DaemonOpts {
backend: String,
local_epics_hostname: String,
array_truncate: usize,
insert_item_queue_cap: usize,
search_tgts: Vec<SocketAddrV4>,
search_excl: Vec<SocketAddrV4>,
//search_excl: Vec<SocketAddrV4>,
pgconf: Database,
scyconf: ScyllaConfig,
}
impl DaemonOpts {
pub fn backend(&self) -> &str {
&self.backend
}
}
struct OptFut<F> {
@@ -89,8 +130,16 @@ struct OptFut<F> {
}
impl<F> OptFut<F> {
fn new(fut: Option<F>) -> Self {
Self { fut }
fn empty() -> Self {
Self { fut: None }
}
fn new(fut: F) -> Self {
Self { fut: Some(fut) }
}
fn is_enabled(&self) -> bool {
self.fut.is_some()
}
}
@@ -108,99 +157,51 @@ where
}
}
pub async fn make_pg_client(d: &Database) -> Result<PgClient, Error> {
let (client, pg_conn) = tokio_postgres::connect(
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
tokio_postgres::tls::NoTls,
)
.await
.err_conv()?;
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
tokio::spawn(pg_conn);
Ok(client)
}
pub struct Daemon {
opts: DaemonOpts,
channel_states: BTreeMap<Channel, ChannelState>,
tx: Sender<DaemonEvent>,
rx: Receiver<DaemonEvent>,
conns: BTreeMap<SocketAddrV4, CaConn>,
conns: BTreeMap<SocketAddrV4, (Sender<ConnCommand>, tokio::task::JoinHandle<()>)>,
chan_check_next: Option<Channel>,
search_tx: Sender<String>,
ioc_finder_jh: tokio::task::JoinHandle<()>,
datastore: Arc<DataStore>,
common_insert_item_queue: CommonInsertItemQueue,
}
impl Daemon {
pub fn new(opts: DaemonOpts) -> Self {
let (tx, rx) = async_channel::bounded(1);
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
let pg_client = make_pg_client(&opts.pgconf).await?;
let pg_client = Arc::new(pg_client);
let datastore = DataStore::new(&opts.scyconf, pg_client).await?;
let datastore = Arc::new(datastore);
let (tx, rx) = async_channel::bounded(32);
let tgts = opts.search_tgts.clone();
let (search_tx, ioc_finder_jh) = {
let (qtx, qrx) = async_channel::bounded(1);
let (atx, arx) = async_channel::bounded(1);
let ioc_finder_fut = async move {
const FINDER_JOB_QUEUE_LEN_MAX: usize = 1;
let mut finder = FindIocStream::new(tgts);
let mut fut1 = finder.next();
let mut fut2 = qrx.recv().fuse();
let mut fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse());
let mut asend = OptFut::new(None).fuse();
loop {
tokio::time::sleep(Duration::from_millis(200)).await;
futures_util::select! {
_ = asend => {
info!("asend done");
}
r1 = fut1 => {
match r1 {
Some(item) => {
asend = OptFut::new(Some(atx.send(item))).fuse();
}
None => {
// TODO finder has stopped, do no longer poll on it
}
}
if finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
fut2 = qrx.recv().fuse();
}
fut1 = finder.next();
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse());
}
r2 = fut2 => {
match r2 {
Ok(item) => {
info!("Push to finder: {item:?}");
finder.push(item);
}
Err(e) => {
// TODO input is done... ignore from here on.
error!("{e}");
break;
}
}
if finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
fut2 = qrx.recv().fuse();
}
fut1 = finder.next();
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse());
}
_ = fut_tick => {
if finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
//fut2 = qrx.recv().fuse();
}
fut1 = finder.next();
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)).fuse());
}
};
let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), tgts);
let common_insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap);
// TODO hook up with insert worker
tokio::spawn({
let rx = common_insert_item_queue.receiver();
async move {
while let Ok(item) = rx.recv().await {
info!("insert queue item {item:?}");
}
};
let ioc_finder_jh = taskrun::spawn(ioc_finder_fut);
taskrun::spawn({
let tx = tx.clone();
async move {
while let Ok(item) = arx.recv().await {
info!("forward search result item");
match tx.send(DaemonEvent::SearchDone(item)).await {
Ok(_) => {}
Err(e) => {
error!("search res fwd {e}");
}
}
}
warn!("search res fwd nput broken");
}
});
(qtx, ioc_finder_jh)
};
Self {
}
});
let ret = Self {
opts,
channel_states: BTreeMap::new(),
tx,
@@ -209,83 +210,227 @@ impl Daemon {
chan_check_next: None,
search_tx,
ioc_finder_jh,
}
datastore,
common_insert_item_queue,
};
Ok(ret)
}
fn check_chans(&mut self) -> Result<(), Error> {
fn start_finder(tx: Sender<DaemonEvent>, tgts: Vec<SocketAddrV4>) -> (Sender<String>, tokio::task::JoinHandle<()>) {
let (qtx, qrx) = async_channel::bounded(32);
let (atx, arx) = async_channel::bounded(32);
let ioc_finder_fut = async move {
const FINDER_JOB_QUEUE_LEN_MAX: usize = 1;
let mut finder = FindIocStream::new(tgts);
let mut finder_more = true;
let mut fut1 = OptFut::new(finder.next());
let mut fut2 = OptFut::new(qrx.recv());
let mut qrx_more = true;
let mut fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
let mut asend = OptFut::empty();
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
tokio::select! {
_ = &mut asend, if asend.is_enabled() => {
//info!("finder asend done");
asend = OptFut::empty();
}
r1 = &mut fut1, if fut1.is_enabled() => {
//info!("finder fut1");
fut1 = OptFut::empty();
match r1 {
Some(item) => {
asend = OptFut::new(atx.send(item));
}
None => {
// TODO finder has stopped, do no longer poll on it
warn!("Finder has stopped");
finder_more = false;
}
}
//info!("finder.job_queue_len() {}", finder.job_queue_len());
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
fut2 = OptFut::new(qrx.recv());
}
if finder_more {
fut1 = OptFut::new(finder.next());
}
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)));
}
r2 = &mut fut2, if fut2.is_enabled() => {
//info!("finder fut2");
fut2 = OptFut::empty();
match r2 {
Ok(item) => {
//info!("Push to finder: {item:?}");
finder.push(item);
}
Err(e) => {
// TODO input is done... ignore from here on.
error!("Finder input channel error {e}");
qrx_more = false;
}
}
//info!("finder.job_queue_len() {}", finder.job_queue_len());
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
fut2 = OptFut::new(qrx.recv());
}
if finder_more {
fut1 = OptFut::new(finder.next());
} else {
fut1 = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)));
}
_ = &mut fut_tick => {
//info!("finder fut_tick finder.job_queue_len() {}", finder.job_queue_len());
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
fut2 = OptFut::new(qrx.recv());
}
if finder_more {
fut1 = OptFut::new(finder.next());
} else {
fut1 = OptFut::empty();
}
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)));
}
else => {
error!("all branches are disabled");
break;
}
};
}
};
let ioc_finder_jh = taskrun::spawn(ioc_finder_fut);
taskrun::spawn({
async move {
while let Ok(item) = arx.recv().await {
//info!("forward search result item");
match tx.send(DaemonEvent::SearchDone(item)).await {
Ok(_) => {}
Err(e) => {
error!("search res fwd {e}");
}
}
}
warn!("search res fwd nput broken");
}
});
(qtx, ioc_finder_jh)
}
async fn check_chans(&mut self) -> Result<(), Error> {
let tsnow = SystemTime::now();
let k = self.chan_check_next.take();
info!("check_chans start at {:?}", k);
info!("------------ check_chans start at {:?}", k);
let mut currently_search_pending = 0;
for (_ch, st) in &self.channel_states {
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value {
currently_search_pending += 1;
}
}
let it = if let Some(last) = k {
self.channel_states.range_mut(last..)
} else {
self.channel_states.range_mut(..)
};
let mut count_unknown_address = 0;
let mut count_search_pending = 0;
let mut count_no_address = 0;
let mut count_unassigned = 0;
let mut count_assigned = 0;
for (i, (ch, st)) in it.enumerate() {
info!("check chan {} {:?}", i, ch);
use ActiveChannelState::*;
use ChannelState::*;
match st {
use ChannelStateValue::*;
match &mut st.value {
Active(st2) => match st2 {
UnknownAddress => {
if self.search_tx.is_full() {
// TODO what to do if the queue is full?
} else {
match self.search_tx.try_send(ch.id().into()) {
Ok(_) => {
*st = Active(SearchPending { since: tsnow });
}
Err(_) => {
error!("can not send search query");
}
//info!("UnknownAddress {} {:?}", i, ch);
count_unknown_address += 1;
if currently_search_pending < 10 {
currently_search_pending += 1;
if st.pending_op.is_none() {
st.pending_op = Some(ChanOp::Finder(ch.id().to_string(), tsnow));
st.value = Active(SearchPending { since: tsnow });
}
}
}
SearchPending { since } => {
//info!("SearchPending {} {:?}", i, ch);
count_search_pending += 1;
// TODO handle Err
match tsnow.duration_since(*since) {
Ok(dt) => {
if dt >= Duration::from_millis(10000) {
warn!("Search timeout for {ch:?}");
*st = Active(ActiveChannelState::NoAddress);
st.value = Active(ActiveChannelState::NoAddress);
currently_search_pending -= 1;
}
}
Err(e) => {
error!("{e}");
error!("SearchPending {e}");
}
}
}
WithAddress { addr, state } => {
//info!("WithAddress {} {:?}", i, ch);
use WithAddressState::*;
match state {
Unassigned { assign_at } => {
count_unassigned += 1;
if *assign_at <= tsnow {
match self.conns.get(addr) {
Some(conn) => {
let tx = conn.conn_command_tx();
let (cmd, rx) = ConnCommand::channel_add(ch.id().into());
// TODO how to send the command from this non-async context?
//tx.send(cmd).await;
// TODO if the send can be assumed to be on its way (it may still fail) then update state
if true {
let cs = ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unconnected,
};
*state = WithAddressState::Assigned(cs)
}
if st.pending_op.is_none() {
if !self.conns.contains_key(addr) {
info!("==================== create CaConn for {ch:?}");
let backend = self.opts.backend().into();
let local_epics_hostname = self.opts.local_epics_hostname.clone();
let array_truncate = self.opts.array_truncate;
let insert_item_sender = self.common_insert_item_queue.sender().await;
let mut conn = CaConn::new(
backend,
addr.clone(),
local_epics_hostname,
self.datastore.clone(),
insert_item_sender,
array_truncate,
256,
);
let conn_tx = conn.conn_command_tx();
let conn_fut = async move { while let Some(_item) = conn.next().await {} };
let conn_jh = tokio::spawn(conn_fut);
self.conns.insert(*addr, (conn_tx, conn_jh));
}
if let Some((tx, _)) = self.conns.get(addr) {
let tx = tx.clone();
let (cmd, rx) = ConnCommand::channel_add(ch.id().into());
let fut = async move {
tx.send(cmd).await?;
let res = rx.recv().await?;
info!("answer from CaConn: {res:?}");
Ok(())
};
st.pending_op = Some(ChanOp::ConnCmd(Box::pin(fut)));
let cs = ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unconnected,
};
*state = WithAddressState::Assigned(cs)
} else {
error!("no CaConn for {ch:?}");
}
None => {}
}
}
}
Assigned(_) => {
// TODO check if channel is healthy and alive
count_assigned += 1;
}
}
}
NoAddress => {
// TODO try to find address again after some randomized timeout
//info!("NoAddress {} {:?}", i, ch);
count_no_address += 1;
}
},
ToRemove { .. } => {
@@ -297,51 +442,104 @@ impl Daemon {
break;
}
}
info!(
"{:8} {:8} {:8} {:8} {:8}",
count_unknown_address, count_search_pending, count_unassigned, count_assigned, count_no_address
);
for (_ch, st) in &mut self.channel_states {
match &mut st.pending_op {
Some(op) => match op {
ChanOp::Finder(s, start) => {
if *start + Duration::from_millis(10000) >= tsnow {
match self.search_tx.try_send(s.clone()) {
Ok(_) => {
st.pending_op = None;
info!("OK, sent msg to Finder");
}
Err(e) => match e {
async_channel::TrySendError::Full(_) => {
//warn!("Finder channel full");
*start = tsnow;
}
async_channel::TrySendError::Closed(_) => {
error!("Finder channel closed");
}
},
}
} else {
st.pending_op = None;
warn!("ChanOp::Finder timeout");
}
}
ChanOp::ConnCmd(fut) => {
use std::task::Poll::*;
match futures_util::poll!(fut) {
Ready(res) => {
st.pending_op = None;
match res {
Ok(_) => {
info!("ChanOp::ConnCmd completed fine");
}
Err(e) => {
error!("ChanOp::ConnCmd {e}");
}
}
}
Pending => {}
}
}
},
None => {}
}
}
Ok(())
}
fn handle_timer_tick(&mut self) -> Result<(), Error> {
self.check_chans()?;
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
self.check_chans().await?;
Ok(())
}
fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> {
if !self.channel_states.contains_key(&ch) {
self.channel_states
.insert(ch, ChannelState::Active(ActiveChannelState::UnknownAddress));
let st = ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress),
pending_op: None,
};
self.channel_states.insert(ch, st);
}
Ok(())
}
fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> {
if let Some(k) = self.channel_states.get_mut(&ch) {
match k {
ChannelState::Active(j) => match j {
match &k.value {
ChannelStateValue::Active(j) => match j {
ActiveChannelState::UnknownAddress => {
*k = ChannelState::ToRemove { addr: None };
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::SearchPending { .. } => {
*k = ChannelState::ToRemove { addr: None };
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WithAddress { addr, .. } => {
*k = ChannelState::ToRemove {
k.value = ChannelStateValue::ToRemove {
addr: Some(addr.clone()),
};
}
ActiveChannelState::NoAddress => {
*k = ChannelState::ToRemove { addr: None };
k.value = ChannelStateValue::ToRemove { addr: None };
}
},
ChannelState::ToRemove { .. } => {}
ChannelStateValue::ToRemove { .. } => {}
}
}
Ok(())
}
fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> {
async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> {
use DaemonEvent::*;
match item {
TimerTick => self.handle_timer_tick(),
TimerTick => self.handle_timer_tick().await,
ChannelAdd(ch) => self.handle_channel_add(ch),
ChannelRemove(ch) => self.handle_channel_remove(ch),
SearchDone(res) => {
@@ -353,14 +551,16 @@ impl Daemon {
let addr = addr.clone();
let ch = Channel::new(res.channel);
if let Some(st) = self.channel_states.get_mut(&ch) {
if let ChannelState::Active(ActiveChannelState::SearchPending { .. }) = st {
let stnew = ChannelState::Active(ActiveChannelState::WithAddress {
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) =
&st.value
{
let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress {
addr,
state: WithAddressState::Unassigned {
assign_at: SystemTime::now(),
},
});
self.channel_states.insert(ch, stnew);
st.value = stnew;
} else {
warn!("state for {ch:?} is not SearchPending");
}
@@ -385,7 +585,7 @@ impl Daemon {
let ticker = {
let tx = self.tx.clone();
async move {
let mut ticker = tokio::time::interval(Duration::from_millis(500));
let mut ticker = tokio::time::interval(Duration::from_millis(1500));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
@@ -401,7 +601,7 @@ impl Daemon {
match self.rx.recv().await {
Ok(item) => {
info!("got daemon event {item:?}");
match self.handle_event(item) {
match self.handle_event(item).await {
Ok(_) => {}
Err(e) => {
error!("daemon: {e}");
@@ -415,6 +615,8 @@ impl Daemon {
}
}
}
warn!("TODO shut down IOC finder properly");
let _ = &self.ioc_finder_jh;
Ok(())
}
}
@@ -428,10 +630,15 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
}
info!("parsed search_tgts {search_tgts:?}");
let opts2 = DaemonOpts {
backend: opts.backend().into(),
local_epics_hostname: opts.local_epics_hostname().into(),
array_truncate: opts.array_truncate(),
insert_item_queue_cap: opts.insert_item_queue_cap(),
pgconf: opts.postgresql().clone(),
scyconf: opts.scylla().clone(),
search_tgts,
search_excl: Vec::new(),
};
let mut daemon = Daemon::new(opts2);
let mut daemon = Daemon::new(opts2).await?;
let tx = daemon.tx.clone();
let daemon_jh = taskrun::spawn(async move {
// TODO handle Err

View File

@@ -21,7 +21,7 @@ byteorder = "1.4"
futures-util = "0.3"
#pin-project-lite = "0.2"
scylla = "0.7"
tokio-postgres = "0.7.6"
tokio-postgres = "0.7.7"
md-5 = "0.10"
hex = "0.4"
libc = "0.2"

View File

@@ -5,7 +5,6 @@ use super::proto::CaMsgTy;
use super::proto::CaProto;
use super::store::DataStore;
use super::ExtraInsertsConf;
use super::IngestCommons;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::CreateChan;
use crate::ca::proto::EventAdd;
@@ -392,8 +391,6 @@ pub struct CaConn {
conn_backoff: f32,
conn_backoff_beg: f32,
inserts_counter: u64,
#[allow(unused)]
ingest_commons: Arc<IngestCommons>,
extra_inserts_conf: ExtraInsertsConf,
}
@@ -406,7 +403,6 @@ impl CaConn {
insert_item_sender: CommonInsertItemQueueSender,
array_truncate: usize,
insert_queue_max: usize,
ingest_commons: Arc<IngestCommons>,
) -> Self {
let (cq_tx, cq_rx) = async_channel::bounded(32);
Self {
@@ -437,7 +433,6 @@ impl CaConn {
conn_backoff: 0.02,
conn_backoff_beg: 0.02,
inserts_counter: 0,
ingest_commons,
extra_inserts_conf: ExtraInsertsConf::new(),
}
}
@@ -671,7 +666,11 @@ impl CaConn {
self.stats.inserts_queue_push_inc();
self.insert_item_send_fut = None;
}
Ready(Err(_)) => break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))),
Ready(Err(e)) => {
self.insert_item_send_fut = None;
error!("handle_insert_futs can not send item {e}");
break Ready(Err(Error::with_msg_no_trace(format!("can not send the item"))));
}
Pending => {
if false {
// TODO test this case.
@@ -985,7 +984,10 @@ impl CaConn {
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&ev.subid).unwrap();
// TODO get rid of the string clone when I don't want the log output any longer:
if true {
let name = self.name_by_cid(cid);
info!("event {name:?} {ev:?}");
}
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();

View File

@@ -89,7 +89,6 @@ impl CaConnSet {
insert_queue_max: usize,
insert_item_queue_sender: CommonInsertItemQueueSender,
data_store: Arc<DataStore>,
ingest_commons: Arc<IngestCommons>,
with_channels: Vec<String>,
) -> Result<(), Error> {
info!("create new CaConn {:?}", addr);
@@ -102,7 +101,6 @@ impl CaConnSet {
insert_item_queue_sender,
array_truncate,
insert_queue_max,
ingest_commons,
);
for ch in with_channels {
conn.channel_add(ch);
@@ -273,7 +271,6 @@ impl CaConnSet {
200,
ingest_commons.insert_item_queue.sender().await,
ingest_commons.data_store.clone(),
ingest_commons.clone(),
vec![channel_name],
)
.await?;

View File

@@ -1,6 +1,6 @@
use crate::ca::proto::{CaMsg, CaMsgTy, HeadInfo};
use err::Error;
use futures_util::{FutureExt, Stream};
use futures_util::{Future, FutureExt, Stream};
use libc::c_int;
use log::*;
use std::collections::{BTreeMap, VecDeque};
@@ -69,6 +69,7 @@ pub struct FindIocStream {
bids_timed_out: BTreeMap<BatchId, ()>,
sids_done: BTreeMap<SearchId, ()>,
result_for_done_sid_count: u64,
sleeper: Pin<Box<dyn Future<Output = ()> + Send>>,
}
impl FindIocStream {
@@ -94,6 +95,7 @@ impl FindIocStream {
in_flight_max: 20,
channels_per_batch: 10,
batch_run_max: Duration::from_millis(2500),
sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))),
}
}
@@ -574,7 +576,13 @@ impl Stream for FindIocStream {
continue;
} else {
if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() {
Ready(None)
match self.sleeper.poll_unpin(cx) {
Ready(_) => {
self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
continue;
}
Pending => Pending,
}
} else {
Pending
}