Tune search timeouts
This commit is contained in:
@@ -12,6 +12,7 @@ use netfetch::ca::findioc::FindIocStream;
|
||||
use netfetch::ca::store::DataStore;
|
||||
use netfetch::conf::CaIngestOpts;
|
||||
use netfetch::errconv::ErrConv;
|
||||
use netfetch::metrics::ExtraInsertsConf;
|
||||
use netfetch::store::CommonInsertItemQueue;
|
||||
use netpod::Database;
|
||||
use netpod::ScyllaConfig;
|
||||
@@ -21,12 +22,22 @@ use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
const CHECK_CHANS_PER_TICK: usize = 50000;
|
||||
const CHECK_CHANS_PER_TICK: usize = 10000;
|
||||
const FINDER_TIMEOUT: usize = 100;
|
||||
const FINDER_JOB_QUEUE_LEN_MAX: usize = 20;
|
||||
const FINDER_IN_FLIGHT_MAX: usize = 200;
|
||||
const FINDER_BATCH_SIZE: usize = 8;
|
||||
const CURRENT_SEARCH_PENDING_MAX: usize = 220;
|
||||
const SEARCH_PENDING_TIMEOUT: usize = 10000;
|
||||
const TIMEOUT_WARN_FACTOR: usize = 10;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)]
|
||||
pub struct Channel {
|
||||
@@ -179,25 +190,49 @@ pub struct Daemon {
|
||||
search_tx: Sender<String>,
|
||||
ioc_finder_jh: tokio::task::JoinHandle<()>,
|
||||
datastore: Arc<DataStore>,
|
||||
common_insert_item_queue: CommonInsertItemQueue,
|
||||
common_insert_item_queue: Arc<CommonInsertItemQueue>,
|
||||
insert_queue_counter: Arc<AtomicUsize>,
|
||||
count_unknown_address: usize,
|
||||
count_search_pending: usize,
|
||||
count_no_address: usize,
|
||||
count_unassigned: usize,
|
||||
count_assigned: usize,
|
||||
last_status_print: SystemTime,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
|
||||
let pg_client = make_pg_client(&opts.pgconf).await?;
|
||||
let pg_client = Arc::new(pg_client);
|
||||
let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?);
|
||||
let datastore = DataStore::new(&opts.scyconf, pg_client).await?;
|
||||
let datastore = Arc::new(datastore);
|
||||
let (tx, rx) = async_channel::bounded(32);
|
||||
let tgts = opts.search_tgts.clone();
|
||||
let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), tgts);
|
||||
let common_insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap);
|
||||
let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
|
||||
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let ingest_commons = netfetch::ca::IngestCommons {
|
||||
pgconf: Arc::new(opts.pgconf.clone()),
|
||||
backend: opts.backend().into(),
|
||||
local_epics_hostname: opts.local_epics_hostname.clone(),
|
||||
insert_item_queue: common_insert_item_queue.clone(),
|
||||
data_store: datastore.clone(),
|
||||
insert_ivl_min: Arc::new(AtomicU64::new(0)),
|
||||
extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()),
|
||||
store_workers_rate: AtomicU64::new(20000),
|
||||
insert_frac: AtomicU64::new(1000),
|
||||
ca_conn_set: netfetch::ca::connset::CaConnSet::new(),
|
||||
};
|
||||
let _ingest_commons = Arc::new(ingest_commons);
|
||||
|
||||
// TODO hook up with insert worker
|
||||
tokio::spawn({
|
||||
let rx = common_insert_item_queue.receiver();
|
||||
let insert_queue_counter = insert_queue_counter.clone();
|
||||
async move {
|
||||
while let Ok(item) = rx.recv().await {
|
||||
info!("insert queue item {item:?}");
|
||||
insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel);
|
||||
trace!("insert queue item {item:?}");
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -212,6 +247,13 @@ impl Daemon {
|
||||
ioc_finder_jh,
|
||||
datastore,
|
||||
common_insert_item_queue,
|
||||
insert_queue_counter,
|
||||
count_unknown_address: 0,
|
||||
count_search_pending: 0,
|
||||
count_no_address: 0,
|
||||
count_unassigned: 0,
|
||||
count_assigned: 0,
|
||||
last_status_print: SystemTime::now(),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -220,24 +262,29 @@ impl Daemon {
|
||||
let (qtx, qrx) = async_channel::bounded(32);
|
||||
let (atx, arx) = async_channel::bounded(32);
|
||||
let ioc_finder_fut = async move {
|
||||
const FINDER_JOB_QUEUE_LEN_MAX: usize = 1;
|
||||
let mut finder = FindIocStream::new(tgts);
|
||||
let mut finder = FindIocStream::new(
|
||||
tgts,
|
||||
Duration::from_millis(FINDER_TIMEOUT as u64),
|
||||
FINDER_IN_FLIGHT_MAX,
|
||||
FINDER_BATCH_SIZE,
|
||||
);
|
||||
let fut_tick_dur = Duration::from_millis(100);
|
||||
let mut finder_more = true;
|
||||
let mut fut1 = OptFut::new(finder.next());
|
||||
let mut fut2 = OptFut::new(qrx.recv());
|
||||
let mut finder_fut = OptFut::new(finder.next());
|
||||
let mut qrx_fut = OptFut::new(qrx.recv());
|
||||
let mut qrx_more = true;
|
||||
let mut fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
|
||||
let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
|
||||
let mut asend = OptFut::empty();
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
//tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
tokio::select! {
|
||||
_ = &mut asend, if asend.is_enabled() => {
|
||||
//info!("finder asend done");
|
||||
asend = OptFut::empty();
|
||||
}
|
||||
r1 = &mut fut1, if fut1.is_enabled() => {
|
||||
r1 = &mut finder_fut, if finder_fut.is_enabled() => {
|
||||
//info!("finder fut1");
|
||||
fut1 = OptFut::empty();
|
||||
finder_fut = OptFut::empty();
|
||||
match r1 {
|
||||
Some(item) => {
|
||||
asend = OptFut::new(atx.send(item));
|
||||
@@ -250,16 +297,16 @@ impl Daemon {
|
||||
}
|
||||
//info!("finder.job_queue_len() {}", finder.job_queue_len());
|
||||
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
|
||||
fut2 = OptFut::new(qrx.recv());
|
||||
qrx_fut = OptFut::new(qrx.recv());
|
||||
}
|
||||
if finder_more {
|
||||
fut1 = OptFut::new(finder.next());
|
||||
finder_fut = OptFut::new(finder.next());
|
||||
}
|
||||
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)));
|
||||
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
|
||||
}
|
||||
r2 = &mut fut2, if fut2.is_enabled() => {
|
||||
r2 = &mut qrx_fut, if qrx_fut.is_enabled() => {
|
||||
//info!("finder fut2");
|
||||
fut2 = OptFut::empty();
|
||||
qrx_fut = OptFut::empty();
|
||||
match r2 {
|
||||
Ok(item) => {
|
||||
//info!("Push to finder: {item:?}");
|
||||
@@ -273,26 +320,26 @@ impl Daemon {
|
||||
}
|
||||
//info!("finder.job_queue_len() {}", finder.job_queue_len());
|
||||
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
|
||||
fut2 = OptFut::new(qrx.recv());
|
||||
qrx_fut = OptFut::new(qrx.recv());
|
||||
}
|
||||
if finder_more {
|
||||
fut1 = OptFut::new(finder.next());
|
||||
finder_fut = OptFut::new(finder.next());
|
||||
} else {
|
||||
fut1 = OptFut::empty();
|
||||
finder_fut = OptFut::empty();
|
||||
}
|
||||
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)));
|
||||
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
|
||||
}
|
||||
_ = &mut fut_tick => {
|
||||
//info!("finder fut_tick finder.job_queue_len() {}", finder.job_queue_len());
|
||||
if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX {
|
||||
fut2 = OptFut::new(qrx.recv());
|
||||
qrx_fut = OptFut::new(qrx.recv());
|
||||
}
|
||||
if finder_more {
|
||||
fut1 = OptFut::new(finder.next());
|
||||
finder_fut = OptFut::new(finder.next());
|
||||
} else {
|
||||
fut1 = OptFut::empty();
|
||||
finder_fut = OptFut::empty();
|
||||
}
|
||||
fut_tick = Box::pin(tokio::time::sleep(Duration::from_millis(2000)));
|
||||
fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur));
|
||||
}
|
||||
else => {
|
||||
error!("all branches are disabled");
|
||||
@@ -322,11 +369,13 @@ impl Daemon {
|
||||
async fn check_chans(&mut self) -> Result<(), Error> {
|
||||
let tsnow = SystemTime::now();
|
||||
let k = self.chan_check_next.take();
|
||||
info!("------------ check_chans start at {:?}", k);
|
||||
trace!("------------ check_chans start at {:?}", k);
|
||||
let mut currently_search_pending = 0;
|
||||
for (_ch, st) in &self.channel_states {
|
||||
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value {
|
||||
currently_search_pending += 1;
|
||||
{
|
||||
for (_ch, st) in &self.channel_states {
|
||||
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) = &st.value {
|
||||
currently_search_pending += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
let it = if let Some(last) = k {
|
||||
@@ -334,11 +383,6 @@ impl Daemon {
|
||||
} else {
|
||||
self.channel_states.range_mut(..)
|
||||
};
|
||||
let mut count_unknown_address = 0;
|
||||
let mut count_search_pending = 0;
|
||||
let mut count_no_address = 0;
|
||||
let mut count_unassigned = 0;
|
||||
let mut count_assigned = 0;
|
||||
for (i, (ch, st)) in it.enumerate() {
|
||||
use ActiveChannelState::*;
|
||||
use ChannelStateValue::*;
|
||||
@@ -346,8 +390,7 @@ impl Daemon {
|
||||
Active(st2) => match st2 {
|
||||
UnknownAddress => {
|
||||
//info!("UnknownAddress {} {:?}", i, ch);
|
||||
count_unknown_address += 1;
|
||||
if currently_search_pending < 10 {
|
||||
if currently_search_pending < CURRENT_SEARCH_PENDING_MAX {
|
||||
currently_search_pending += 1;
|
||||
if st.pending_op.is_none() {
|
||||
st.pending_op = Some(ChanOp::Finder(ch.id().to_string(), tsnow));
|
||||
@@ -357,12 +400,11 @@ impl Daemon {
|
||||
}
|
||||
SearchPending { since } => {
|
||||
//info!("SearchPending {} {:?}", i, ch);
|
||||
count_search_pending += 1;
|
||||
// TODO handle Err
|
||||
match tsnow.duration_since(*since) {
|
||||
Ok(dt) => {
|
||||
if dt >= Duration::from_millis(10000) {
|
||||
warn!("Search timeout for {ch:?}");
|
||||
if dt >= Duration::from_millis(SEARCH_PENDING_TIMEOUT as u64) {
|
||||
debug!("Search timeout for {ch:?}");
|
||||
st.value = Active(ActiveChannelState::NoAddress);
|
||||
currently_search_pending -= 1;
|
||||
}
|
||||
@@ -377,11 +419,10 @@ impl Daemon {
|
||||
use WithAddressState::*;
|
||||
match state {
|
||||
Unassigned { assign_at } => {
|
||||
count_unassigned += 1;
|
||||
if *assign_at <= tsnow {
|
||||
if st.pending_op.is_none() {
|
||||
if !self.conns.contains_key(addr) {
|
||||
info!("==================== create CaConn for {ch:?}");
|
||||
debug!("==================== create CaConn for {ch:?}");
|
||||
let backend = self.opts.backend().into();
|
||||
let local_epics_hostname = self.opts.local_epics_hostname.clone();
|
||||
let array_truncate = self.opts.array_truncate;
|
||||
@@ -406,7 +447,10 @@ impl Daemon {
|
||||
let fut = async move {
|
||||
tx.send(cmd).await?;
|
||||
let res = rx.recv().await?;
|
||||
info!("answer from CaConn: {res:?}");
|
||||
debug!("answer from CaConn: {res:?}");
|
||||
if res != true {
|
||||
warn!("problem from CaConn");
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
st.pending_op = Some(ChanOp::ConnCmd(Box::pin(fut)));
|
||||
@@ -423,14 +467,12 @@ impl Daemon {
|
||||
}
|
||||
Assigned(_) => {
|
||||
// TODO check if channel is healthy and alive
|
||||
count_assigned += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
NoAddress => {
|
||||
// TODO try to find address again after some randomized timeout
|
||||
//info!("NoAddress {} {:?}", i, ch);
|
||||
count_no_address += 1;
|
||||
}
|
||||
},
|
||||
ToRemove { .. } => {
|
||||
@@ -442,19 +484,15 @@ impl Daemon {
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"{:8} {:8} {:8} {:8} {:8}",
|
||||
count_unknown_address, count_search_pending, count_unassigned, count_assigned, count_no_address
|
||||
);
|
||||
for (_ch, st) in &mut self.channel_states {
|
||||
for (ch, st) in &mut self.channel_states {
|
||||
match &mut st.pending_op {
|
||||
Some(op) => match op {
|
||||
ChanOp::Finder(s, start) => {
|
||||
if *start + Duration::from_millis(10000) >= tsnow {
|
||||
match self.search_tx.try_send(s.clone()) {
|
||||
Ok(_) => {
|
||||
*start = tsnow;
|
||||
st.pending_op = None;
|
||||
info!("OK, sent msg to Finder");
|
||||
}
|
||||
Err(e) => match e {
|
||||
async_channel::TrySendError::Full(_) => {
|
||||
@@ -468,7 +506,11 @@ impl Daemon {
|
||||
}
|
||||
} else {
|
||||
st.pending_op = None;
|
||||
warn!("ChanOp::Finder timeout");
|
||||
warn!("ChanOp::Finder send timeout for {ch:?}");
|
||||
*st = ChannelState {
|
||||
value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress),
|
||||
pending_op: None,
|
||||
};
|
||||
}
|
||||
}
|
||||
ChanOp::ConnCmd(fut) => {
|
||||
@@ -478,7 +520,7 @@ impl Daemon {
|
||||
st.pending_op = None;
|
||||
match res {
|
||||
Ok(_) => {
|
||||
info!("ChanOp::ConnCmd completed fine");
|
||||
debug!("ChanOp::ConnCmd completed fine");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("ChanOp::ConnCmd {e}");
|
||||
@@ -492,11 +534,55 @@ impl Daemon {
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
{
|
||||
self.count_unknown_address = 0;
|
||||
self.count_search_pending = 0;
|
||||
self.count_no_address = 0;
|
||||
self.count_unassigned = 0;
|
||||
self.count_assigned = 0;
|
||||
for (_ch, st) in &self.channel_states {
|
||||
match &st.value {
|
||||
ChannelStateValue::Active(st) => match st {
|
||||
ActiveChannelState::UnknownAddress => {
|
||||
self.count_unknown_address += 1;
|
||||
}
|
||||
ActiveChannelState::SearchPending { .. } => {
|
||||
self.count_search_pending += 1;
|
||||
}
|
||||
ActiveChannelState::WithAddress { state, .. } => match state {
|
||||
WithAddressState::Unassigned { .. } => {
|
||||
self.count_unassigned += 1;
|
||||
}
|
||||
WithAddressState::Assigned(_) => {
|
||||
self.count_assigned += 1;
|
||||
}
|
||||
},
|
||||
ActiveChannelState::NoAddress => {
|
||||
self.count_no_address += 1;
|
||||
}
|
||||
},
|
||||
ChannelStateValue::ToRemove { .. } => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
|
||||
let tsnow = SystemTime::now();
|
||||
self.check_chans().await?;
|
||||
if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= Duration::from_millis(1000) {
|
||||
self.last_status_print = tsnow;
|
||||
info!(
|
||||
"{:8} {:8} {:8} : {:8} {:8} : {:10}",
|
||||
self.count_unknown_address,
|
||||
self.count_search_pending,
|
||||
self.count_no_address,
|
||||
self.count_unassigned,
|
||||
self.count_assigned,
|
||||
self.insert_queue_counter.load(atomic::Ordering::Acquire),
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -536,48 +622,77 @@ impl Daemon {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_search_done(&mut self, item: Result<VecDeque<FindIocRes>, Error>) -> Result<(), Error> {
|
||||
//debug!("handle SearchDone: {res:?}");
|
||||
match item {
|
||||
Ok(a) => {
|
||||
for res in a {
|
||||
if let Some(addr) = &res.addr {
|
||||
let addr = addr.clone();
|
||||
let ch = Channel::new(res.channel);
|
||||
if let Some(st) = self.channel_states.get_mut(&ch) {
|
||||
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value {
|
||||
let dt = SystemTime::now().duration_since(*since).unwrap();
|
||||
if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) {
|
||||
warn!(
|
||||
" FOUND {:5.0} {:5.0} {addr}",
|
||||
1e3 * dt.as_secs_f32(),
|
||||
1e3 * res.dt.as_secs_f32()
|
||||
);
|
||||
}
|
||||
let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress {
|
||||
addr,
|
||||
state: WithAddressState::Unassigned {
|
||||
assign_at: SystemTime::now(),
|
||||
},
|
||||
});
|
||||
st.value = stnew;
|
||||
} else {
|
||||
warn!(
|
||||
"address found, but state for {ch:?} is not SearchPending: {:?}",
|
||||
st.value
|
||||
);
|
||||
}
|
||||
} else {
|
||||
warn!("can not find channel state for {ch:?}");
|
||||
}
|
||||
} else {
|
||||
//debug!("no addr from search in {res:?}");
|
||||
let ch = Channel::new(res.channel);
|
||||
if let Some(st) = self.channel_states.get_mut(&ch) {
|
||||
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value {
|
||||
let dt = SystemTime::now().duration_since(*since).unwrap();
|
||||
if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) {
|
||||
warn!(
|
||||
"NOT FOUND {:5.0} {:5.0}",
|
||||
1e3 * dt.as_secs_f32(),
|
||||
1e3 * res.dt.as_secs_f32()
|
||||
);
|
||||
}
|
||||
st.value = ChannelStateValue::Active(ActiveChannelState::NoAddress);
|
||||
} else {
|
||||
warn!("no address, but state for {ch:?} is not SearchPending: {:?}", st.value);
|
||||
}
|
||||
} else {
|
||||
warn!("can not find channel state for {ch:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error from search: {e}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> {
|
||||
use DaemonEvent::*;
|
||||
match item {
|
||||
TimerTick => self.handle_timer_tick().await,
|
||||
ChannelAdd(ch) => self.handle_channel_add(ch),
|
||||
ChannelRemove(ch) => self.handle_channel_remove(ch),
|
||||
SearchDone(res) => {
|
||||
info!("handle SearchDone: {res:?}");
|
||||
match res {
|
||||
Ok(a) => {
|
||||
for res in a {
|
||||
if let Some(addr) = &res.addr {
|
||||
let addr = addr.clone();
|
||||
let ch = Channel::new(res.channel);
|
||||
if let Some(st) = self.channel_states.get_mut(&ch) {
|
||||
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { .. }) =
|
||||
&st.value
|
||||
{
|
||||
let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress {
|
||||
addr,
|
||||
state: WithAddressState::Unassigned {
|
||||
assign_at: SystemTime::now(),
|
||||
},
|
||||
});
|
||||
st.value = stnew;
|
||||
} else {
|
||||
warn!("state for {ch:?} is not SearchPending");
|
||||
}
|
||||
} else {
|
||||
warn!("can not find channel state for {ch:?}");
|
||||
}
|
||||
} else {
|
||||
warn!("no addr from search in {res:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error from search: {e}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
SearchDone(item) => self.handle_search_done(item),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -585,7 +700,7 @@ impl Daemon {
|
||||
let ticker = {
|
||||
let tx = self.tx.clone();
|
||||
async move {
|
||||
let mut ticker = tokio::time::interval(Duration::from_millis(1500));
|
||||
let mut ticker = tokio::time::interval(Duration::from_millis(100));
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
@@ -599,16 +714,13 @@ impl Daemon {
|
||||
taskrun::spawn(ticker);
|
||||
loop {
|
||||
match self.rx.recv().await {
|
||||
Ok(item) => {
|
||||
info!("got daemon event {item:?}");
|
||||
match self.handle_event(item).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("daemon: {e}");
|
||||
break;
|
||||
}
|
||||
Ok(item) => match self.handle_event(item).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("daemon: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
break;
|
||||
|
||||
@@ -984,7 +984,7 @@ impl CaConn {
|
||||
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
|
||||
// TODO handle subid-not-found which can also be peer error:
|
||||
let cid = *self.cid_by_subid.get(&ev.subid).unwrap();
|
||||
if true {
|
||||
if false {
|
||||
let name = self.name_by_cid(cid);
|
||||
info!("event {name:?} {ev:?}");
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ struct SearchBatch {
|
||||
tgts: VecDeque<usize>,
|
||||
channels: Vec<String>,
|
||||
sids: Vec<SearchId>,
|
||||
done: Vec<SearchId>,
|
||||
done: Vec<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -48,6 +48,7 @@ pub struct FindIocRes {
|
||||
pub query_addr: Option<SocketAddrV4>,
|
||||
pub response_addr: Option<SocketAddrV4>,
|
||||
pub addr: Option<SocketAddrV4>,
|
||||
pub dt: Duration,
|
||||
}
|
||||
|
||||
pub struct FindIocStream {
|
||||
@@ -73,7 +74,7 @@ pub struct FindIocStream {
|
||||
}
|
||||
|
||||
impl FindIocStream {
|
||||
pub fn new(tgts: Vec<SocketAddrV4>) -> Self {
|
||||
pub fn new(tgts: Vec<SocketAddrV4>, batch_run_max: Duration, in_flight_max: usize, batch_size: usize) -> Self {
|
||||
let sock = unsafe { Self::create_socket() }.unwrap();
|
||||
let afd = AsyncFd::new(sock.0).unwrap();
|
||||
Self {
|
||||
@@ -92,9 +93,9 @@ impl FindIocStream {
|
||||
bids_timed_out: BTreeMap::new(),
|
||||
sids_done: BTreeMap::new(),
|
||||
result_for_done_sid_count: 0,
|
||||
in_flight_max: 20,
|
||||
channels_per_batch: 10,
|
||||
batch_run_max: Duration::from_millis(2500),
|
||||
in_flight_max,
|
||||
channels_per_batch: batch_size,
|
||||
batch_run_max,
|
||||
sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))),
|
||||
}
|
||||
}
|
||||
@@ -294,23 +295,35 @@ impl FindIocStream {
|
||||
if accounted != ec as usize {
|
||||
info!("unaccounted data ec {} accounted {}", ec, accounted);
|
||||
}
|
||||
if msgs.len() != 2 {
|
||||
info!("expect always 2 commands in the response, instead got {}", msgs.len());
|
||||
if msgs.len() < 1 {
|
||||
warn!("received answer without messages");
|
||||
}
|
||||
for m in &msgs {
|
||||
debug!("m: {m:?}");
|
||||
if msgs.len() == 1 {
|
||||
warn!("received answer with single message: {msgs:?}");
|
||||
}
|
||||
let mut good = true;
|
||||
if let CaMsgTy::VersionRes(v) = msgs[0].ty {
|
||||
if v != 13 {
|
||||
warn!("bad version: {msgs:?}");
|
||||
good = false;
|
||||
}
|
||||
} else {
|
||||
debug!("first message is not a version: {:?}", msgs[0].ty);
|
||||
// Seems like a bug in many IOCs
|
||||
//good = false;
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
for msg in msgs.iter() {
|
||||
match &msg.ty {
|
||||
CaMsgTy::SearchRes(k) => {
|
||||
info!("SearchRes: {k:?}");
|
||||
let addr = SocketAddrV4::new(src_addr, k.tcp_port);
|
||||
res.push((SearchId(k.id), addr));
|
||||
}
|
||||
CaMsgTy::VersionRes(13) => {}
|
||||
_ => {
|
||||
warn!("try_read: unknown message received {:?}", msg.ty);
|
||||
if good {
|
||||
for msg in &msgs[1..] {
|
||||
match &msg.ty {
|
||||
CaMsgTy::SearchRes(k) => {
|
||||
let addr = SocketAddrV4::new(src_addr, k.tcp_port);
|
||||
res.push((SearchId(k.id), addr));
|
||||
}
|
||||
//CaMsgTy::VersionRes(13) => {}
|
||||
_ => {
|
||||
warn!("try_read: unknown message received {:?}", msg.ty);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -338,29 +351,29 @@ impl FindIocStream {
|
||||
}
|
||||
|
||||
fn create_in_flight(&mut self) {
|
||||
let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel);
|
||||
let bid = BatchId(bid as u32);
|
||||
let bid = BatchId(BATCH_ID.fetch_add(1, Ordering::AcqRel) as u32);
|
||||
let mut sids = Vec::new();
|
||||
let mut chs = Vec::new();
|
||||
while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 {
|
||||
let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel);
|
||||
let sid = SearchId(sid as u32);
|
||||
let sid = SearchId(SEARCH_ID2.fetch_add(1, Ordering::AcqRel) as u32);
|
||||
self.bid_by_sid.insert(sid.clone(), bid.clone());
|
||||
sids.push(sid);
|
||||
chs.push(self.channels_input.pop_front().unwrap());
|
||||
}
|
||||
let n = chs.len();
|
||||
let batch = SearchBatch {
|
||||
ts_beg: Instant::now(),
|
||||
channels: chs,
|
||||
tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(),
|
||||
sids,
|
||||
done: Vec::new(),
|
||||
done: vec![false; n],
|
||||
};
|
||||
self.in_flight.insert(bid.clone(), batch);
|
||||
self.batch_send_queue.push_back(bid);
|
||||
}
|
||||
|
||||
fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) {
|
||||
let tsnow = Instant::now();
|
||||
let mut sids_remove = Vec::new();
|
||||
for (sid, addr) in res {
|
||||
self.sids_done.insert(sid.clone(), ());
|
||||
@@ -369,39 +382,38 @@ impl FindIocStream {
|
||||
sids_remove.push(sid.clone());
|
||||
match self.in_flight.get_mut(bid) {
|
||||
Some(batch) => {
|
||||
// TGT
|
||||
let mut found_sid = false;
|
||||
for (i2, s2) in batch.sids.iter().enumerate() {
|
||||
if s2 == &sid {
|
||||
found_sid = true;
|
||||
batch.done[i2] = true;
|
||||
match batch.channels.get(i2) {
|
||||
Some(ch) => {
|
||||
let dt = tsnow.saturating_duration_since(batch.ts_beg);
|
||||
let res = FindIocRes {
|
||||
channel: ch.into(),
|
||||
// TODO associate a batch with a specific query address.
|
||||
query_addr: None,
|
||||
response_addr: Some(src.clone()),
|
||||
addr: Some(addr),
|
||||
dt,
|
||||
};
|
||||
self.out_queue.push_back(res);
|
||||
}
|
||||
None => {
|
||||
error!("no matching channel for {sid:?}");
|
||||
error!(
|
||||
"logic error batch sids / channels lens: {} vs {}",
|
||||
batch.sids.len(),
|
||||
batch.channels.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Book keeping:
|
||||
batch.done.push(sid);
|
||||
let mut all_done = true;
|
||||
if batch.done.len() >= batch.sids.len() {
|
||||
for s1 in &batch.sids {
|
||||
if !batch.done.contains(s1) {
|
||||
all_done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
all_done = false;
|
||||
if !found_sid {
|
||||
error!("can not find sid {sid:?} in batch {bid:?}");
|
||||
}
|
||||
let all_done = batch.done.iter().all(|x| *x);
|
||||
if all_done {
|
||||
self.bids_all_done.insert(bid.clone(), ());
|
||||
self.in_flight.remove(bid);
|
||||
@@ -429,29 +441,33 @@ impl FindIocStream {
|
||||
}
|
||||
|
||||
fn clear_timed_out(&mut self) {
|
||||
let now = Instant::now();
|
||||
let tsnow = Instant::now();
|
||||
let mut bids = Vec::new();
|
||||
let mut sids = Vec::new();
|
||||
let mut chns = Vec::new();
|
||||
let mut dts = Vec::new();
|
||||
for (bid, batch) in &mut self.in_flight {
|
||||
if now.duration_since(batch.ts_beg) > self.batch_run_max {
|
||||
let dt = tsnow.saturating_duration_since(batch.ts_beg);
|
||||
if dt > self.batch_run_max {
|
||||
self.bids_timed_out.insert(bid.clone(), ());
|
||||
for (i2, sid) in batch.sids.iter().enumerate() {
|
||||
if batch.done.contains(sid) == false {
|
||||
if batch.done[i2] == false {
|
||||
debug!("Timeout: {bid:?} {}", batch.channels[i2]);
|
||||
sids.push(sid.clone());
|
||||
chns.push(batch.channels[i2].clone());
|
||||
dts.push(dt);
|
||||
}
|
||||
sids.push(sid.clone());
|
||||
chns.push(batch.channels[i2].clone());
|
||||
}
|
||||
bids.push(bid.clone());
|
||||
}
|
||||
}
|
||||
for (sid, ch) in sids.into_iter().zip(chns) {
|
||||
for ((sid, ch), dt) in sids.into_iter().zip(chns).zip(dts) {
|
||||
let res = FindIocRes {
|
||||
query_addr: None,
|
||||
response_addr: None,
|
||||
channel: ch,
|
||||
addr: None,
|
||||
dt,
|
||||
};
|
||||
self.out_queue.push_back(res);
|
||||
self.bid_by_sid.remove(&sid);
|
||||
@@ -536,9 +552,10 @@ impl Stream for FindIocStream {
|
||||
}
|
||||
None => {
|
||||
if self.bids_all_done.contains_key(&bid) {
|
||||
// TODO count events
|
||||
// Already answered from another target
|
||||
//trace!("bid {bid:?} from batch send queue not in flight AND all done");
|
||||
} else {
|
||||
info!("Batch {bid:?} seems already done");
|
||||
warn!("bid {bid:?} from batch send queue not in flight NOT done");
|
||||
}
|
||||
loop_again = true;
|
||||
}
|
||||
|
||||
@@ -524,6 +524,7 @@ impl CaMsg {
|
||||
CaScalarType::I32 => convert_wave_value!(i32, I32, n, buf),
|
||||
CaScalarType::F32 => convert_wave_value!(f32, F32, n, buf),
|
||||
CaScalarType::F64 => convert_wave_value!(f64, F64, n, buf),
|
||||
CaScalarType::String => CaDataValue::Scalar(CaDataScalarValue::String("todo-array-string".into())),
|
||||
_ => {
|
||||
warn!("TODO conversion array {scalar_type:?}");
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
|
||||
@@ -104,7 +104,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let mut finder = FindIocStream::new(addrs);
|
||||
let mut finder = FindIocStream::new(addrs, Duration::from_millis(1000), 20, 1);
|
||||
for ch in channels.iter() {
|
||||
finder.push(ch.into());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user