Handle better the case of many unreachable channels

This commit is contained in:
Dominik Werder
2025-01-23 10:22:37 +01:00
parent bd7abaeb16
commit cd8d1e135c
9 changed files with 317 additions and 255 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.5-aa.4"
version = "0.2.5"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -19,6 +19,7 @@ futures-util = "0.3"
chrono = "0.4.38"
bytes = "1.8.0"
libc = "0.2"
autoerr = "0.0.3"
err = { path = "../../daqbuf-err", package = "daqbuf-err" }
netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }

View File

@@ -6,9 +6,12 @@ use stats::IocFinderStats;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, ThisError)]
#[cstm(name = "CaTools")]
pub enum Error {}
autoerr::create_error_v1!(
name(Error, "CaTools"),
enum variants {
Test,
},
);
pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> {
eprintln!("{:?}", broadcast);

View File

@@ -11,8 +11,6 @@ use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use enumfetch::ConnFuture;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -22,7 +20,6 @@ use log::*;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use netpod::timeunits::*;
use netpod::trigger;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
use netpod::SeriesKind;
@@ -72,7 +69,6 @@ use stats::IntervalEma;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
@@ -150,13 +146,9 @@ macro_rules! trace_monitor_stale {
};
}
fn dbg_chn_name(name: impl AsRef<str>) -> bool {
name.as_ref() == "SINSB02-KCOL-ACT:V-EY21700-MAN-ON-SP"
}
fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool {
if let Some(name) = conn.name_by_cid(cid) {
dbg_chn_name(name)
series::dbg::dbg_chn(name)
} else {
false
}
@@ -164,39 +156,37 @@ fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool {
type CaRtWriter = RtWriter<CaWriterValue>;
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchConn")]
pub enum Error {
NoProtocol,
ProtocolError,
IocIssue,
Protocol(#[from] proto::Error),
RtWriter(#[from] serieswriter::rtwriter::Error),
BinWriter(#[from] serieswriter::binwriter::Error),
SeriesWriter(#[from] serieswriter::writer::Error),
// TODO remove false positive from ThisError derive
#[allow(private_interfaces)]
UnknownCid(Cid),
#[allow(private_interfaces)]
NoNameForCid(Cid),
CreateChannelBadState,
CommonError(#[from] err::Error),
LoopInnerLogicError,
NoSender,
NotSending,
ClosedSending,
NoProgressNoPending,
ShutdownWithQueuesNoProgressNoPending,
Error,
DurationOutOfBounds,
NoFreeCid,
InsertQueues(#[from] scywr::insertqueues::Error),
FutLogic,
MissingTimestamp,
EnumFetch(#[from] enumfetch::Error),
SeriesLookup(#[from] dbpg::seriesbychannel::Error),
Netpod(#[from] netpod::Error),
}
autoerr::create_error_v1!(
name(Error, "NetfetchConn"),
enum variants {
NoProtocol,
ProtocolError,
IocIssue,
Protocol(#[from] proto::Error),
RtWriter(#[from] serieswriter::rtwriter::Error),
BinWriter(#[from] serieswriter::binwriter::Error),
SeriesWriter(#[from] serieswriter::writer::Error),
UnknownCid(Cid),
NoNameForCid(Cid),
CreateChannelBadState,
CommonError(#[from] err::Error),
LoopInnerLogicError,
NoSender,
NotSending,
ClosedSending,
NoProgressNoPending,
ShutdownWithQueuesNoProgressNoPending,
Error,
DurationOutOfBounds,
NoFreeCid,
InsertQueues(#[from] scywr::insertqueues::Error),
FutLogic,
MissingTimestamp,
EnumFetch(#[from] enumfetch::Error),
SeriesLookup(#[from] dbpg::seriesbychannel::Error),
Netpod(#[from] netpod::Error),
},
);
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
@@ -1508,13 +1498,13 @@ impl CaConn {
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> {
debug!("channel_add {conf:?} {cssid:?}");
if false {
if netpod::trigger.contains(&conf.name()) {
if series::dbg::dbg_chn(&conf.name()) {
self.trace_channel_poll = true;
}
}
if self.cid_by_name(conf.name()).is_some() {
self.stats.channel_add_exists.inc();
if trigger.contains(&conf.name()) {
if series::dbg::dbg_chn(&conf.name()) {
error!("logic error channel already exists {conf:?}");
}
Ok(())
@@ -1522,7 +1512,7 @@ impl CaConn {
let cid = self.cid_by_name_or_insert(conf.name())?;
if self.channels.contains_key(&cid) {
self.stats.channel_add_exists.inc();
if trigger.contains(&conf.name()) {
if series::dbg::dbg_chn(&conf.name()) {
error!("logic error channel already exists {conf:?}");
}
Ok(())
@@ -1680,7 +1670,7 @@ impl CaConn {
self.iqdqs.st_rf3_qu.push_back(x);
}
for (_cid, conf) in &mut self.channels {
if dbg_chn_name(conf.conf.name()) {
if series::dbg::dbg_chn(conf.conf.name()) {
info!("channel_state_on_shutdown {:?}", conf);
}
let chst = &mut conf.state;
@@ -2701,7 +2691,9 @@ impl CaConn {
let cid = Cid(msg.cid);
if let Some(conf) = self.channels.get(&cid) {
let name = conf.conf.name();
debug!("queue event to notive channel create fail {name}");
if series::dbg::dbg_chn(&name) {
info!("queue event to notice channel create fail {name}");
}
let item = CaConnEvent {
ts: tsnow,
value: CaConnEventValue::ChannelCreateFail(name.into()),
@@ -2826,7 +2818,7 @@ impl CaConn {
status_emit_count: 0,
ts_recv_value_status_emit_next: Instant::now(),
};
if dbg_chn_name(created_state.name()) {
if series::dbg::dbg_chn(created_state.name()) {
info!(
"handle_create_chan_res {:?} {}",
created_state.cid,

View File

@@ -25,8 +25,6 @@ use dbpg::seriesbychannel::BoxedSend;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::thiserror;
use err::ThisError;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -61,7 +59,7 @@ use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use netpod::trigger;
use crate::queueset::QueueSet;
use netpod::OnDrop;
use netpod::TsNano;
use scywr::insertqueues::InsertQueuesTx;
@@ -76,7 +74,7 @@ use taskrun::tokio;
const CHECK_CHANS_PER_TICK: usize = 10000000;
pub const SEARCH_BATCH_MAX: usize = 64;
pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 2;
pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4;
const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(15000);
const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000);
const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000);
@@ -86,59 +84,37 @@ const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0);
const UNASSIGN_FOR_CONFIG_CHANGE_TIMEOUT: Duration = Duration::from_millis(1000 * 10);
const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace2 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace3 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! trace4 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
#[allow(unused)]
macro_rules! trace_health_update { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_health_update { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
#[allow(unused)]
macro_rules! trace_channel_state { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_channel_state { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
#[derive(Debug, ThisError)]
#[cstm(name = "CaConnSet")]
pub enum Error {
ChannelSend,
TaskJoin(#[from] tokio::task::JoinError),
SeriesLookup(#[from] dbpg::seriesbychannel::Error),
Beacons(#[from] crate::ca::beacons::Error),
SeriesWriter(#[from] serieswriter::writer::Error),
ExpectIpv4,
UnknownCssid,
Regex(#[from] regex::Error),
MissingChannelInfoChannelTx,
UnexpectedChannelDummyState,
CaConnEndWithoutReason,
PushCmdsNoSendInProgress(SocketAddr),
SenderPollingSend,
NoProgressNoPending,
IocFinder(#[from] crate::ca::finder::Error),
ChannelAssignedWithoutConnRess,
}
autoerr::create_error_v1!(
name(Error, "CaConnSet"),
enum variants {
ChannelSend,
TaskJoin(#[from] tokio::task::JoinError),
SeriesLookup(#[from] dbpg::seriesbychannel::Error),
Beacons(#[from] crate::ca::beacons::Error),
SeriesWriter(#[from] serieswriter::writer::Error),
ExpectIpv4,
UnknownCssid,
Regex(#[from] regex::Error),
MissingChannelInfoChannelTx,
UnexpectedChannelDummyState,
CaConnEndWithoutReason,
PushCmdsNoSendInProgress(SocketAddr),
SenderPollingSend,
NoProgressNoPending,
IocFinder(#[from] crate::ca::finder::Error),
ChannelAssignedWithoutConnRess,
},
);
impl<T> From<async_channel::SendError<T>> for Error {
fn from(_value: async_channel::SendError<T>) -> Self {
@@ -440,6 +416,7 @@ pub struct CaConnSet {
find_ioc_query_queue: VecDeque<IocAddrQuery>,
find_ioc_query_sender: Pin<Box<SenderPolling<IocAddrQuery>>>,
find_ioc_res_rx: Pin<Box<Receiver<VecDeque<FindIocRes>>>>,
find_ioc_queue_set: QueueSet<ChannelName>,
iqtx: Pin<Box<InsertQueuesTx>>,
storage_insert_queue_l1: VecDeque<QueryItem>,
storage_insert_queue: VecDeque<VecDeque<QueryItem>>,
@@ -507,6 +484,7 @@ impl CaConnSet {
find_ioc_query_queue: VecDeque::new(),
find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)),
find_ioc_res_rx: Box::pin(find_ioc_res_rx),
find_ioc_queue_set: QueueSet::new(),
iqtx: Box::pin(iqtx.clone()),
storage_insert_queue_l1: VecDeque::new(),
storage_insert_queue: VecDeque::new(),
@@ -691,6 +669,9 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::UnassigningForConfigChange(st4) => {
st4.config_new = cmd.ch_cfg;
}
WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => {
ress.chst.config = cmd.ch_cfg;
}
},
},
ChannelStateValue::ToRemove { .. } => {
@@ -809,7 +790,7 @@ impl CaConnSet {
return Ok(());
}
self.stats.channel_status_series_found().inc();
if trigger.contains(&name) {
if series::dbg::dbg_chn(&name) {
info!("handle_add_channel_with_status_id {cmd:?}");
}
let ch = ChannelName::new(name.into());
@@ -874,7 +855,7 @@ impl CaConnSet {
} else {
return Err(Error::ExpectIpv4);
};
if trigger.contains(&name) {
if series::dbg::dbg_chn(&name) {
info!("handle_add_channel_with_addr {cmd:?}");
}
let ch = ChannelName::new(name.into());
@@ -974,6 +955,9 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::UnassigningForConfigChange(..) => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
},
},
ChannelStateValue::ToRemove { .. } => {}
@@ -990,7 +974,7 @@ impl CaConnSet {
}
for res in results {
let ch = ChannelName::new(res.channel.clone());
if trigger.contains(&ch.name()) {
if series::dbg::dbg_chn(&ch.name()) {
info!("handle_ioc_query_result {res:?}");
}
if let Some(chst) = self.channel_states.get_mut(&ch) {
@@ -1140,18 +1124,34 @@ impl CaConnSet {
}
fn handle_channel_create_fail(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> {
trace!("handle_channel_create_fail {addr} {name}");
let tsnow = SystemTime::now();
if series::dbg::dbg_chn(&name) {
info!("handle_channel_create_fail {:?} {:?}", name, addr);
} else {
trace!("handle_channel_create_fail {:?} {:?}", name, addr);
}
let stnow = SystemTime::now();
let ch = ChannelName::new(name);
if let Some(st1) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId(st3) = st2 {
trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress");
if series::dbg::dbg_chn(ch.name()) {
info!(
"handle_channel_create_fail {:?} {:?} set to MaybeWrongAddress",
ch, addr
);
} else {
trace!(
"handle_channel_create_fail {:?} {:?} set to MaybeWrongAddress",
ch,
addr
);
}
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(MaybeWrongAddressState::new(
tsnow,
st3.addr_find_backoff,
));
let snew = MaybeWrongAddressState::new(stnow, st3.addr_find_backoff);
if series::dbg::dbg_chn(ch.name()) {
info!("handle_channel_create_fail update state {:?}", snew);
}
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew);
}
}
}
@@ -1218,6 +1218,7 @@ impl CaConnSet {
self.handle_add_channel_with_addr(cmd)?;
Ok(())
}
WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => Ok(()),
},
},
ChannelStateValue::ToRemove { .. } => {
@@ -1238,7 +1239,8 @@ impl CaConnSet {
}
fn transition_channels_to_maybe_wrong_address(&mut self, addr: SocketAddr) -> Result<(), Error> {
let tsnow = SystemTime::now();
// TODO take a "reason" as parameter for status emit.
let stnow = SystemTime::now();
for (ch, st1) in self.channel_states.iter_mut() {
match &mut st1.value {
ChannelStateValue::Active(st2) => match st2 {
@@ -1249,37 +1251,25 @@ impl CaConnSet {
match &mut st3.inner {
AddrSearchPending { since: _ } => {}
WithAddress { addr: addr2, state: _ } => {
if trigger.contains(&ch.name()) {
info!(" connect fail, maybe wrong address for {} {}", addr, ch.name());
}
if SocketAddr::V4(*addr2) == addr {
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address AA {addr}");
}
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff),
);
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address BB {:?}", st1);
let snew = MaybeWrongAddressState::new(stnow, st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew.clone());
if series::dbg::dbg_chn(&ch.name()) {
info!(
"transition_channels_to_maybe_wrong_address BB {:?} {:?} {:?} {:?}",
ch, addr, snew, st1
);
}
} else {
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address BB {addr}");
}
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff),
);
if trigger.contains(&ch.name()) {
info!("transition_channels_to_maybe_wrong_address BB {:?}", st1);
}
// nothing to do
}
}
UnknownAddress { since: _ } => {}
NoAddress { since: _ } => {}
MaybeWrongAddress(_) => {}
UnassigningForConfigChange(_) => {}
AddrSearchPlanned { .. } => {}
}
}
},
@@ -1516,34 +1506,14 @@ impl CaConnSet {
}
}
ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner {
WithStatusSeriesIdStateInner::UnknownAddress { since } => {
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < stnow {
if false {
error!("TODO trigger address search from state UnknownAddress");
if true {
std::process::exit(1);
}
if false {
// TODO
search_pending_count += 1;
st3.inner =
WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
}
} else {
search_pending_count += 1;
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
let qu = IocAddrQuery::uncached(ch.name().into());
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
}
}
}
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
self.find_ioc_queue_set.push_back(ch.clone());
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPlanned { since: stnow };
}
WithStatusSeriesIdStateInner::AddrSearchPending { since } => {
let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
debug!("TODO should receive some error indication instead of timeout for {ch:?}");
info!("should receive some error indication instead of timeout for {ch:?}");
st3.inner = WithStatusSeriesIdStateInner::NoAddress { since: stnow };
search_pending_count -= 1;
}
@@ -1582,9 +1552,14 @@ impl CaConnSet {
let addr = SocketAddr::V4(*addr_v4);
cmd_remove_channel.push((addr, ch.clone()));
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(stnow, st3.addr_find_backoff),
);
let snew = MaybeWrongAddressState::new(stnow, st3.addr_find_backoff);
if series::dbg::dbg_chn(ch.name()) {
info!(
"check_channel_states update state {:?} {:?} {:?}",
ch, addr, snew
);
}
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew);
let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone());
let (tsev, val) = item.to_ts_val();
let deque = &mut item_deque;
@@ -1606,16 +1581,17 @@ impl CaConnSet {
}
WithStatusSeriesIdStateInner::MaybeWrongAddress(st4) => {
if st4.since + st4.backoff_dt < stnow {
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
trace!("try again channel after MaybeWrongAddress");
if trigger.contains(&ch.name()) {
info!("issue ioc search for {}", ch.name());
}
search_pending_count += 1;
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
let qu = IocAddrQuery::uncached(ch.name().into());
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
if series::dbg::dbg_chn(ch.name()) {
info!(
"check_channel_states MaybeWrongAddress set to AddrSearchPlanned {:?}",
ch
);
}
self.find_ioc_queue_set.push_back(ch.clone());
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPlanned { since: stnow };
} else {
if series::dbg::dbg_chn(ch.name()) {
// info!("MaybeWrongAddress back off {:?}", ch);
}
}
}
@@ -1624,6 +1600,12 @@ impl CaConnSet {
debug!("timeout unassign for config change");
}
}
WithStatusSeriesIdStateInner::AddrSearchPlanned { since: _ } => {
// TODO record elapsed from since for metrics
if series::dbg::dbg_chn(ch.name()) {
info!("AddrSearchPlanned {:?} {:?}", ch, search_pending_count);
}
}
},
},
ChannelStateValue::ToRemove { .. } => {
@@ -1638,6 +1620,34 @@ impl CaConnSet {
break;
}
}
loop {
break if search_pending_count >= CURRENT_SEARCH_PENDING_MAX as _ {
} else {
if let Some(ch) = self.find_ioc_queue_set.pop_front() {
if let Some(st1) = self.channel_states.get_mut(&ch) {
match &mut st1.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::WithStatusSeriesId(st3) => {
if series::dbg::dbg_chn(ch.name()) {
info!("issue ioc search {:?}", ch);
} else {
trace!("issue ioc search {:?}", ch);
}
search_pending_count += 1;
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
let qu = IocAddrQuery::uncached(ch.name().into());
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
}
_ => {}
},
_ => {}
}
}
continue;
}
};
}
self.storage_insert_queue.push_back(item_deque);
for (addr, ch) in cmd_remove_channel {
if let Some(g) = self.ca_conn_ress.get_mut(&addr) {
@@ -1706,6 +1716,9 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::UnassigningForConfigChange(_) => {
assigned += 1;
}
WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => {
no_address += 1;
}
},
},
ChannelStateValue::ToRemove { .. } => {

View File

@@ -21,7 +21,7 @@ use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
const SEARCH_DB_PIPELINE_LEN: usize = 2;
const SEARCH_DB_WORKER_CNT: usize = 2;
macro_rules! debug_batch { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) }
@@ -40,28 +40,35 @@ autoerr::create_error_v1!(
fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
let mut ret = VecDeque::new();
for row in rows {
let ch: Result<String, _> = row.try_get(0);
if let Ok(ch) = ch {
if let Some(addr) = row.get::<_, Option<String>>(1) {
let addr = addr.parse().map_or(None, |x| Some(x));
let item = FindIocRes {
channel: ch,
response_addr: None,
addr,
dt: Duration::from_millis(0),
};
ret.push_back(item);
} else {
let item = FindIocRes {
channel: ch,
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
ret.push_back(item);
let n: Result<i32, _> = row.try_get(0);
let ch: Result<String, _> = row.try_get(1);
match (n, ch) {
(Ok(n), Ok(ch)) => {
if let Some(addr) = row.get::<_, Option<String>>(3) {
let addr = addr.parse().map_or(None, |x| Some(x));
let item = FindIocRes {
channel: ch,
response_addr: None,
addr,
dt: Duration::from_millis(0),
};
ret.push_back(item);
} else {
let item = FindIocRes {
channel: ch,
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
ret.push_back(item);
}
}
(_, Err(e)) => {
error!("bad string from pg: {}", e);
}
(Err(e), _) => {
error!("bad int from pg: {}", e);
}
} else if let Err(e) = ch {
error!("bad string from pg: {e:?}");
}
}
ret
@@ -110,14 +117,10 @@ async fn finder_worker(
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
// TODO do something with join handle
let (batch_rx, jh_batch) = batchtools::batcher::batch(
SEARCH_BATCH_MAX,
Duration::from_millis(200),
SEARCH_DB_PIPELINE_LEN,
qrx,
);
let (batch_rx, jh_batch) =
batchtools::batcher::batch(SEARCH_BATCH_MAX, Duration::from_millis(200), SEARCH_DB_WORKER_CNT, qrx);
let mut jhs = Vec::new();
for _ in 0..SEARCH_DB_PIPELINE_LEN {
for _ in 0..SEARCH_DB_WORKER_CNT {
let jh = tokio::spawn(finder_worker_single(
batch_rx.clone(),
tx.clone(),
@@ -146,25 +149,37 @@ async fn finder_worker_single(
debug!("finder_worker_single make_pg_client");
let (pg, jh) = make_pg_client(&db).await?;
let sql = concat!(
"with q1 as (select * from unnest($2::text[]) as unn (ch))",
" select distinct on (tt.facility, tt.channel) tt.channel, tt.addr",
" from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.archived = 0 and tt.addr is not null",
" order by tt.facility, tt.channel, tsmod desc",
"with q1 as (select * from unnest($2::int[], $3::text[]) as unn (n, ch))",
" select distinct on (q1.n) q1.n, q1.ch, tt.channel, tt.addr, tt.tsmod",
" from q1 left join ioc_by_channel_log tt",
" on tt.channel = q1.ch and tt.facility = $1 and tt.archived = 0 and tt.addr is not null",
" order by q1.n, tsmod desc",
);
let qu_select_multi = pg.prepare(sql).await?;
let mut resdiff = 0;
loop {
match inp.recv().await {
Ok(batch) => {
if batch.iter().filter(|x| crate::dbg_chn(x.name())).next().is_some() {
info!("SEARCHING FOR DBG");
};
for e in batch.iter().filter(|x| series::dbg::dbg_chn(x.name())) {
info!("searching database for {:?}", e);
}
stats.dbsearcher_batch_recv().inc();
stats.dbsearcher_item_recv().add(batch.len() as _);
let ts1 = Instant::now();
let (batch, pass_through) = batch.into_iter().fold((Vec::new(), Vec::new()), |(mut a, mut b), x| {
if x.use_cache() {
a.push(x);
} else {
b.push(x);
}
(a, b)
});
debug_batch!("run query batch len {}", batch.len());
let names: Vec<_> = batch.iter().filter(|x| x.use_cache()).map(|x| x.name()).collect();
let qres = pg.query(&qu_select_multi, &[&backend, &names]).await;
let names: Vec<_> = batch
.iter()
.map(|x| if x.use_cache() { x.name() } else { "---------------" })
.collect();
let ns: Vec<_> = names.iter().enumerate().map(|(i, _)| i as i32).collect();
let qres = pg.query(&qu_select_multi, &[&backend, &ns, &names]).await;
let dt = ts1.elapsed();
debug_batch!(
"done query batch len {}: {} {:.3}ms",
@@ -178,57 +193,42 @@ async fn finder_worker_single(
match qres {
Ok(rows) => {
stats.dbsearcher_select_res_0().add(rows.len() as _);
if rows.len() > batch.len() {
if rows.len() != batch.len() {
stats.dbsearcher_select_error_len_mismatch().inc();
} else if rows.len() < batch.len() {
resdiff += batch.len() - rows.len();
error!("query result len {} batch len {}", rows.len(), batch.len());
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
let nbatch = batch.len();
trace_batch!("received results {} resdiff {}", rows.len(), resdiff);
let items = transform_pgres(rows);
let mut to_add = Vec::new();
{
let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect();
for e in batch {
if !names.contains_key(e.name_string()) {
let item = FindIocRes {
channel: e.name().into(),
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
to_add.push(item);
}
for e in items.iter() {
if series::dbg::dbg_chn(&e.channel) {
info!("found in database {:?}", e);
}
}
let mut items = items;
items.extend(to_add.into_iter());
let items = items;
for e in &items {
trace!("found in database: {e:?}");
}
for e in items.iter() {
if crate::dbg_chn(&e.channel) {
info!("FOUND {e:?}");
}
for e in pass_through {
let x = FindIocRes {
channel: e.name().into(),
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
};
items.push_back(x);
}
let items_len = items.len();
if items_len != nbatch {
stats.dbsearcher_select_error_len_mismatch().inc();
}
match tx.send(items).await {
Ok(_) => {
stats.dbsearcher_batch_send().inc();
stats.dbsearcher_item_send().add(items_len as _);
}
Err(e) => {
error!("finder sees: {e}");
error!("finder sees: {}", e);
break;
}
}
}
Err(e) => {
error!("finder sees error: {e}");
error!("finder sees error: {}", e);
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}

View File

@@ -97,6 +97,10 @@ pub enum WithStatusSeriesIdStateInner {
},
MaybeWrongAddress(MaybeWrongAddressState),
UnassigningForConfigChange(UnassigningForConfigChangeState),
AddrSearchPlanned {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
}
#[derive(Debug, Clone, Serialize)]
@@ -109,7 +113,7 @@ pub struct MaybeWrongAddressState {
impl MaybeWrongAddressState {
pub fn new(since: SystemTime, backoff_cnt: u32) -> Self {
let f = 2. + 60. * (backoff_cnt as f32 / 5.).tanh();
let dtms = 1e-3 * f;
let dtms = 1e3 * f;
Self {
since,
backoff_dt: Duration::from_millis(dtms as u64),

View File

@@ -6,6 +6,7 @@ pub mod linuxhelper;
pub mod metrics;
pub mod netbuf;
pub mod polltimer;
pub mod queueset;
pub mod ratelimit;
pub mod rt;
#[cfg(test)]
@@ -21,8 +22,3 @@ pub fn log_test() {
debug!("log-test");
trace!("log-test");
}
pub fn dbg_chn(chn: &str) -> bool {
let chns = ["SINEG01:QE-B1-OP"];
chns.contains(&chn)
}

View File

@@ -148,10 +148,16 @@ fn system_time_epoch(x: &SystemTime) -> bool {
*x == SystemTime::UNIX_EPOCH
}
#[derive(Debug, Serialize)]
enum Unreachable {
NoAddress,
MaybeWrongAddress,
}
#[derive(Debug, Serialize)]
enum ConnectionState {
Connecting,
Unreachable,
Unreachable(Unreachable),
Disconnected,
Connected,
Error,
@@ -246,19 +252,29 @@ async fn channel_states_try(
states.channels.insert(k, chst);
}
WithStatusSeriesIdStateInner::NoAddress { .. } => {
let chst =
ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable);
let chst = ChannelState::connecting_addr(
st1.config,
None,
ConnectionState::Unreachable(Unreachable::NoAddress),
);
states.channels.insert(k, chst);
}
WithStatusSeriesIdStateInner::MaybeWrongAddress(..) => {
let chst =
ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable);
let chst = ChannelState::connecting_addr(
st1.config,
None,
ConnectionState::Unreachable(Unreachable::MaybeWrongAddress),
);
states.channels.insert(k, chst);
}
WithStatusSeriesIdStateInner::UnassigningForConfigChange(_) => {
let chst = ChannelState::connecting_addr(st1.config, None, ConnectionState::Connecting);
states.channels.insert(k, chst);
}
WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => {
let chst = ChannelState::connecting(st1.config);
states.channels.insert(k, chst);
}
}
}
}

37
netfetch/src/queueset.rs Normal file
View File

@@ -0,0 +1,37 @@
use hashbrown::HashSet;
use std::collections::VecDeque;
use std::hash::Hash;
pub struct QueueSet<T> {
queue: VecDeque<T>,
set: HashSet<T>,
}
impl<T> QueueSet<T>
where
T: Clone + Eq + Hash,
{
pub fn new() -> Self {
Self {
queue: VecDeque::new(),
set: HashSet::new(),
}
}
pub fn push_back(&mut self, e: T) {
if self.set.get(&e).is_some() {
} else {
self.set.insert(e.clone());
self.queue.push_back(e);
}
}
pub fn pop_front(&mut self) -> Option<T> {
if let Some(x) = self.queue.pop_front() {
self.set.remove(&x);
Some(x)
} else {
None
}
}
}