From cd8d1e135c4ca8ce5c91672a347be1ac4bef0921 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 23 Jan 2025 10:22:37 +0100 Subject: [PATCH] Handle better the case of many unreachable channels --- daqingest/Cargo.toml | 3 +- daqingest/src/tools/catools.rs | 9 +- netfetch/src/ca/conn.rs | 88 ++++++------ netfetch/src/ca/connset.rs | 255 +++++++++++++++++---------------- netfetch/src/ca/finder.rs | 142 +++++++++--------- netfetch/src/ca/statemap.rs | 6 +- netfetch/src/lib.rs | 6 +- netfetch/src/metrics/status.rs | 26 +++- netfetch/src/queueset.rs | 37 +++++ 9 files changed, 317 insertions(+), 255 deletions(-) create mode 100644 netfetch/src/queueset.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 14a4e05..cec4a88 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.5-aa.4" +version = "0.2.5" authors = ["Dominik Werder "] edition = "2021" @@ -19,6 +19,7 @@ futures-util = "0.3" chrono = "0.4.38" bytes = "1.8.0" libc = "0.2" +autoerr = "0.0.3" err = { path = "../../daqbuf-err", package = "daqbuf-err" } netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } diff --git a/daqingest/src/tools/catools.rs b/daqingest/src/tools/catools.rs index 47f43f5..73458a9 100644 --- a/daqingest/src/tools/catools.rs +++ b/daqingest/src/tools/catools.rs @@ -6,9 +6,12 @@ use stats::IocFinderStats; use std::sync::Arc; use std::time::Duration; -#[derive(Debug, ThisError)] -#[cstm(name = "CaTools")] -pub enum Error {} +autoerr::create_error_v1!( + name(Error, "CaTools"), + enum variants { + Test, + }, +); pub async fn find(cmd: CaFind, broadcast: String) -> Result<(), Error> { eprintln!("{:?}", broadcast); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index bfd0a5c..a2d8772 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -11,8 +11,6 @@ use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; use enumfetch::ConnFuture; -use err::thiserror; -use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -22,7 +20,6 @@ use log::*; use netpod::channelstatus::ChannelStatus; use netpod::channelstatus::ChannelStatusClosedReason; use netpod::timeunits::*; -use netpod::trigger; use netpod::ttl::RetentionTime; use netpod::ScalarType; use netpod::SeriesKind; @@ -72,7 +69,6 @@ use stats::IntervalEma; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddrV4; -use std::ops::ControlFlow; use std::pin::Pin; use std::sync::atomic; use std::sync::atomic::AtomicUsize; @@ -150,13 +146,9 @@ macro_rules! trace_monitor_stale { }; } -fn dbg_chn_name(name: impl AsRef) -> bool { - name.as_ref() == "SINSB02-KCOL-ACT:V-EY21700-MAN-ON-SP" -} - fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool { if let Some(name) = conn.name_by_cid(cid) { - dbg_chn_name(name) + series::dbg::dbg_chn(name) } else { false } @@ -164,39 +156,37 @@ fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool { type CaRtWriter = RtWriter; -#[derive(Debug, ThisError)] -#[cstm(name = "NetfetchConn")] -pub enum Error { - NoProtocol, - ProtocolError, - IocIssue, - Protocol(#[from] proto::Error), - RtWriter(#[from] serieswriter::rtwriter::Error), - BinWriter(#[from] serieswriter::binwriter::Error), - SeriesWriter(#[from] serieswriter::writer::Error), - // TODO remove false positive from ThisError derive - #[allow(private_interfaces)] - UnknownCid(Cid), - #[allow(private_interfaces)] - NoNameForCid(Cid), - CreateChannelBadState, - CommonError(#[from] err::Error), - LoopInnerLogicError, - NoSender, - NotSending, - ClosedSending, - NoProgressNoPending, - ShutdownWithQueuesNoProgressNoPending, - Error, - DurationOutOfBounds, - NoFreeCid, - InsertQueues(#[from] scywr::insertqueues::Error), - FutLogic, - MissingTimestamp, - EnumFetch(#[from] enumfetch::Error), - SeriesLookup(#[from] dbpg::seriesbychannel::Error), - Netpod(#[from] netpod::Error), -} +autoerr::create_error_v1!( + name(Error, "NetfetchConn"), + enum variants { + NoProtocol, + ProtocolError, + IocIssue, + Protocol(#[from] proto::Error), + RtWriter(#[from] serieswriter::rtwriter::Error), + BinWriter(#[from] serieswriter::binwriter::Error), + SeriesWriter(#[from] serieswriter::writer::Error), + UnknownCid(Cid), + NoNameForCid(Cid), + CreateChannelBadState, + CommonError(#[from] err::Error), + LoopInnerLogicError, + NoSender, + NotSending, + ClosedSending, + NoProgressNoPending, + ShutdownWithQueuesNoProgressNoPending, + Error, + DurationOutOfBounds, + NoFreeCid, + InsertQueues(#[from] scywr::insertqueues::Error), + FutLogic, + MissingTimestamp, + EnumFetch(#[from] enumfetch::Error), + SeriesLookup(#[from] dbpg::seriesbychannel::Error), + Netpod(#[from] netpod::Error), + }, +); impl err::ToErr for Error { fn to_err(self) -> err::Error { @@ -1508,13 +1498,13 @@ impl CaConn { pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> { debug!("channel_add {conf:?} {cssid:?}"); if false { - if netpod::trigger.contains(&conf.name()) { + if series::dbg::dbg_chn(&conf.name()) { self.trace_channel_poll = true; } } if self.cid_by_name(conf.name()).is_some() { self.stats.channel_add_exists.inc(); - if trigger.contains(&conf.name()) { + if series::dbg::dbg_chn(&conf.name()) { error!("logic error channel already exists {conf:?}"); } Ok(()) @@ -1522,7 +1512,7 @@ impl CaConn { let cid = self.cid_by_name_or_insert(conf.name())?; if self.channels.contains_key(&cid) { self.stats.channel_add_exists.inc(); - if trigger.contains(&conf.name()) { + if series::dbg::dbg_chn(&conf.name()) { error!("logic error channel already exists {conf:?}"); } Ok(()) @@ -1680,7 +1670,7 @@ impl CaConn { self.iqdqs.st_rf3_qu.push_back(x); } for (_cid, conf) in &mut self.channels { - if dbg_chn_name(conf.conf.name()) { + if series::dbg::dbg_chn(conf.conf.name()) { info!("channel_state_on_shutdown {:?}", conf); } let chst = &mut conf.state; @@ -2701,7 +2691,9 @@ impl CaConn { let cid = Cid(msg.cid); if let Some(conf) = self.channels.get(&cid) { let name = conf.conf.name(); - debug!("queue event to notive channel create fail {name}"); + if series::dbg::dbg_chn(&name) { + info!("queue event to notice channel create fail {name}"); + } let item = CaConnEvent { ts: tsnow, value: CaConnEventValue::ChannelCreateFail(name.into()), @@ -2826,7 +2818,7 @@ impl CaConn { status_emit_count: 0, ts_recv_value_status_emit_next: Instant::now(), }; - if dbg_chn_name(created_state.name()) { + if series::dbg::dbg_chn(created_state.name()) { info!( "handle_create_chan_res {:?} {}", created_state.cid, diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index a56b43a..65c92c2 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -25,8 +25,6 @@ use dbpg::seriesbychannel::BoxedSend; use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; -use err::thiserror; -use err::ThisError; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; @@ -61,7 +59,7 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; -use netpod::trigger; +use crate::queueset::QueueSet; use netpod::OnDrop; use netpod::TsNano; use scywr::insertqueues::InsertQueuesTx; @@ -76,7 +74,7 @@ use taskrun::tokio; const CHECK_CHANS_PER_TICK: usize = 10000000; pub const SEARCH_BATCH_MAX: usize = 64; -pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 2; +pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(15000); const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000); @@ -86,59 +84,37 @@ const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0); const UNASSIGN_FOR_CONFIG_CHANGE_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000; -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace2 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } -#[allow(unused)] -macro_rules! trace3 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace3 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } -#[allow(unused)] -macro_rules! trace4 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace4 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } -#[allow(unused)] -macro_rules! trace_health_update { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_health_update { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } -#[allow(unused)] -macro_rules! trace_channel_state { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_channel_state { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } -#[derive(Debug, ThisError)] -#[cstm(name = "CaConnSet")] -pub enum Error { - ChannelSend, - TaskJoin(#[from] tokio::task::JoinError), - SeriesLookup(#[from] dbpg::seriesbychannel::Error), - Beacons(#[from] crate::ca::beacons::Error), - SeriesWriter(#[from] serieswriter::writer::Error), - ExpectIpv4, - UnknownCssid, - Regex(#[from] regex::Error), - MissingChannelInfoChannelTx, - UnexpectedChannelDummyState, - CaConnEndWithoutReason, - PushCmdsNoSendInProgress(SocketAddr), - SenderPollingSend, - NoProgressNoPending, - IocFinder(#[from] crate::ca::finder::Error), - ChannelAssignedWithoutConnRess, -} +autoerr::create_error_v1!( + name(Error, "CaConnSet"), + enum variants { + ChannelSend, + TaskJoin(#[from] tokio::task::JoinError), + SeriesLookup(#[from] dbpg::seriesbychannel::Error), + Beacons(#[from] crate::ca::beacons::Error), + SeriesWriter(#[from] serieswriter::writer::Error), + ExpectIpv4, + UnknownCssid, + Regex(#[from] regex::Error), + MissingChannelInfoChannelTx, + UnexpectedChannelDummyState, + CaConnEndWithoutReason, + PushCmdsNoSendInProgress(SocketAddr), + SenderPollingSend, + NoProgressNoPending, + IocFinder(#[from] crate::ca::finder::Error), + ChannelAssignedWithoutConnRess, + }, +); impl From> for Error { fn from(_value: async_channel::SendError) -> Self { @@ -440,6 +416,7 @@ pub struct CaConnSet { find_ioc_query_queue: VecDeque, find_ioc_query_sender: Pin>>, find_ioc_res_rx: Pin>>>, + find_ioc_queue_set: QueueSet, iqtx: Pin>, storage_insert_queue_l1: VecDeque, storage_insert_queue: VecDeque>, @@ -507,6 +484,7 @@ impl CaConnSet { find_ioc_query_queue: VecDeque::new(), find_ioc_query_sender: Box::pin(SenderPolling::new(find_ioc_query_tx)), find_ioc_res_rx: Box::pin(find_ioc_res_rx), + find_ioc_queue_set: QueueSet::new(), iqtx: Box::pin(iqtx.clone()), storage_insert_queue_l1: VecDeque::new(), storage_insert_queue: VecDeque::new(), @@ -691,6 +669,9 @@ impl CaConnSet { WithStatusSeriesIdStateInner::UnassigningForConfigChange(st4) => { st4.config_new = cmd.ch_cfg; } + WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => { + ress.chst.config = cmd.ch_cfg; + } }, }, ChannelStateValue::ToRemove { .. } => { @@ -809,7 +790,7 @@ impl CaConnSet { return Ok(()); } self.stats.channel_status_series_found().inc(); - if trigger.contains(&name) { + if series::dbg::dbg_chn(&name) { info!("handle_add_channel_with_status_id {cmd:?}"); } let ch = ChannelName::new(name.into()); @@ -874,7 +855,7 @@ impl CaConnSet { } else { return Err(Error::ExpectIpv4); }; - if trigger.contains(&name) { + if series::dbg::dbg_chn(&name) { info!("handle_add_channel_with_addr {cmd:?}"); } let ch = ChannelName::new(name.into()); @@ -974,6 +955,9 @@ impl CaConnSet { WithStatusSeriesIdStateInner::UnassigningForConfigChange(..) => { k.value = ChannelStateValue::ToRemove { addr: None }; } + WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } }, }, ChannelStateValue::ToRemove { .. } => {} @@ -990,7 +974,7 @@ impl CaConnSet { } for res in results { let ch = ChannelName::new(res.channel.clone()); - if trigger.contains(&ch.name()) { + if series::dbg::dbg_chn(&ch.name()) { info!("handle_ioc_query_result {res:?}"); } if let Some(chst) = self.channel_states.get_mut(&ch) { @@ -1140,18 +1124,34 @@ impl CaConnSet { } fn handle_channel_create_fail(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> { - trace!("handle_channel_create_fail {addr} {name}"); - let tsnow = SystemTime::now(); + if series::dbg::dbg_chn(&name) { + info!("handle_channel_create_fail {:?} {:?}", name, addr); + } else { + trace!("handle_channel_create_fail {:?} {:?}", name, addr); + } + let stnow = SystemTime::now(); let ch = ChannelName::new(name); if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { if let ActiveChannelState::WithStatusSeriesId(st3) = st2 { - trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress"); + if series::dbg::dbg_chn(ch.name()) { + info!( + "handle_channel_create_fail {:?} {:?} set to MaybeWrongAddress", + ch, addr + ); + } else { + trace!( + "handle_channel_create_fail {:?} {:?} set to MaybeWrongAddress", + ch, + addr + ); + } bump_backoff(&mut st3.addr_find_backoff); - st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(MaybeWrongAddressState::new( - tsnow, - st3.addr_find_backoff, - )); + let snew = MaybeWrongAddressState::new(stnow, st3.addr_find_backoff); + if series::dbg::dbg_chn(ch.name()) { + info!("handle_channel_create_fail update state {:?}", snew); + } + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew); } } } @@ -1218,6 +1218,7 @@ impl CaConnSet { self.handle_add_channel_with_addr(cmd)?; Ok(()) } + WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => Ok(()), }, }, ChannelStateValue::ToRemove { .. } => { @@ -1238,7 +1239,8 @@ impl CaConnSet { } fn transition_channels_to_maybe_wrong_address(&mut self, addr: SocketAddr) -> Result<(), Error> { - let tsnow = SystemTime::now(); + // TODO take a "reason" as parameter for status emit. + let stnow = SystemTime::now(); for (ch, st1) in self.channel_states.iter_mut() { match &mut st1.value { ChannelStateValue::Active(st2) => match st2 { @@ -1249,37 +1251,25 @@ impl CaConnSet { match &mut st3.inner { AddrSearchPending { since: _ } => {} WithAddress { addr: addr2, state: _ } => { - if trigger.contains(&ch.name()) { - info!(" connect fail, maybe wrong address for {} {}", addr, ch.name()); - } if SocketAddr::V4(*addr2) == addr { - if trigger.contains(&ch.name()) { - info!("transition_channels_to_maybe_wrong_address AA {addr}"); - } bump_backoff(&mut st3.addr_find_backoff); - st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( - MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff), - ); - if trigger.contains(&ch.name()) { - info!("transition_channels_to_maybe_wrong_address BB {:?}", st1); + let snew = MaybeWrongAddressState::new(stnow, st3.addr_find_backoff); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew.clone()); + if series::dbg::dbg_chn(&ch.name()) { + info!( + "transition_channels_to_maybe_wrong_address BB {:?} {:?} {:?} {:?}", + ch, addr, snew, st1 + ); } } else { - if trigger.contains(&ch.name()) { - info!("transition_channels_to_maybe_wrong_address BB {addr}"); - } - bump_backoff(&mut st3.addr_find_backoff); - st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( - MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff), - ); - if trigger.contains(&ch.name()) { - info!("transition_channels_to_maybe_wrong_address BB {:?}", st1); - } + // nothing to do } } UnknownAddress { since: _ } => {} NoAddress { since: _ } => {} MaybeWrongAddress(_) => {} UnassigningForConfigChange(_) => {} + AddrSearchPlanned { .. } => {} } } }, @@ -1516,34 +1506,14 @@ impl CaConnSet { } } ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner { - WithStatusSeriesIdStateInner::UnknownAddress { since } => { - if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { - if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < stnow { - if false { - error!("TODO trigger address search from state UnknownAddress"); - if true { - std::process::exit(1); - } - if false { - // TODO - search_pending_count += 1; - st3.inner = - WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; - } - } else { - search_pending_count += 1; - st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; - let qu = IocAddrQuery::uncached(ch.name().into()); - self.find_ioc_query_queue.push_back(qu); - self.stats.ioc_search_start().inc(); - } - } - } + WithStatusSeriesIdStateInner::UnknownAddress { .. } => { + self.find_ioc_queue_set.push_back(ch.clone()); + st3.inner = WithStatusSeriesIdStateInner::AddrSearchPlanned { since: stnow }; } WithStatusSeriesIdStateInner::AddrSearchPending { since } => { let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > SEARCH_PENDING_TIMEOUT { - debug!("TODO should receive some error indication instead of timeout for {ch:?}"); + info!("should receive some error indication instead of timeout for {ch:?}"); st3.inner = WithStatusSeriesIdStateInner::NoAddress { since: stnow }; search_pending_count -= 1; } @@ -1582,9 +1552,14 @@ impl CaConnSet { let addr = SocketAddr::V4(*addr_v4); cmd_remove_channel.push((addr, ch.clone())); bump_backoff(&mut st3.addr_find_backoff); - st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( - MaybeWrongAddressState::new(stnow, st3.addr_find_backoff), - ); + let snew = MaybeWrongAddressState::new(stnow, st3.addr_find_backoff); + if series::dbg::dbg_chn(ch.name()) { + info!( + "check_channel_states update state {:?} {:?} {:?}", + ch, addr, snew + ); + } + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew); let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); let (tsev, val) = item.to_ts_val(); let deque = &mut item_deque; @@ -1606,16 +1581,17 @@ impl CaConnSet { } WithStatusSeriesIdStateInner::MaybeWrongAddress(st4) => { if st4.since + st4.backoff_dt < stnow { - if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { - trace!("try again channel after MaybeWrongAddress"); - if trigger.contains(&ch.name()) { - info!("issue ioc search for {}", ch.name()); - } - search_pending_count += 1; - st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; - let qu = IocAddrQuery::uncached(ch.name().into()); - self.find_ioc_query_queue.push_back(qu); - self.stats.ioc_search_start().inc(); + if series::dbg::dbg_chn(ch.name()) { + info!( + "check_channel_states MaybeWrongAddress set to AddrSearchPlanned {:?}", + ch + ); + } + self.find_ioc_queue_set.push_back(ch.clone()); + st3.inner = WithStatusSeriesIdStateInner::AddrSearchPlanned { since: stnow }; + } else { + if series::dbg::dbg_chn(ch.name()) { + // info!("MaybeWrongAddress back off {:?}", ch); } } } @@ -1624,6 +1600,12 @@ impl CaConnSet { debug!("timeout unassign for config change"); } } + WithStatusSeriesIdStateInner::AddrSearchPlanned { since: _ } => { + // TODO record elapsed from since for metrics + if series::dbg::dbg_chn(ch.name()) { + info!("AddrSearchPlanned {:?} {:?}", ch, search_pending_count); + } + } }, }, ChannelStateValue::ToRemove { .. } => { @@ -1638,6 +1620,34 @@ impl CaConnSet { break; } } + loop { + break if search_pending_count >= CURRENT_SEARCH_PENDING_MAX as _ { + } else { + if let Some(ch) = self.find_ioc_queue_set.pop_front() { + if let Some(st1) = self.channel_states.get_mut(&ch) { + match &mut st1.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::WithStatusSeriesId(st3) => { + if series::dbg::dbg_chn(ch.name()) { + info!("issue ioc search {:?}", ch); + } else { + trace!("issue ioc search {:?}", ch); + } + search_pending_count += 1; + st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow }; + let qu = IocAddrQuery::uncached(ch.name().into()); + self.find_ioc_query_queue.push_back(qu); + self.stats.ioc_search_start().inc(); + } + _ => {} + }, + _ => {} + } + } + continue; + } + }; + } self.storage_insert_queue.push_back(item_deque); for (addr, ch) in cmd_remove_channel { if let Some(g) = self.ca_conn_ress.get_mut(&addr) { @@ -1706,6 +1716,9 @@ impl CaConnSet { WithStatusSeriesIdStateInner::UnassigningForConfigChange(_) => { assigned += 1; } + WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => { + no_address += 1; + } }, }, ChannelStateValue::ToRemove { .. } => { diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index bfa20fe..5732b10 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -21,7 +21,7 @@ use std::time::Instant; use taskrun::tokio; use tokio::task::JoinHandle; -const SEARCH_DB_PIPELINE_LEN: usize = 2; +const SEARCH_DB_WORKER_CNT: usize = 2; macro_rules! debug_batch { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) } @@ -40,28 +40,35 @@ autoerr::create_error_v1!( fn transform_pgres(rows: Vec) -> VecDeque { let mut ret = VecDeque::new(); for row in rows { - let ch: Result = row.try_get(0); - if let Ok(ch) = ch { - if let Some(addr) = row.get::<_, Option>(1) { - let addr = addr.parse().map_or(None, |x| Some(x)); - let item = FindIocRes { - channel: ch, - response_addr: None, - addr, - dt: Duration::from_millis(0), - }; - ret.push_back(item); - } else { - let item = FindIocRes { - channel: ch, - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - ret.push_back(item); + let n: Result = row.try_get(0); + let ch: Result = row.try_get(1); + match (n, ch) { + (Ok(n), Ok(ch)) => { + if let Some(addr) = row.get::<_, Option>(3) { + let addr = addr.parse().map_or(None, |x| Some(x)); + let item = FindIocRes { + channel: ch, + response_addr: None, + addr, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } else { + let item = FindIocRes { + channel: ch, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } + } + (_, Err(e)) => { + error!("bad string from pg: {}", e); + } + (Err(e), _) => { + error!("bad int from pg: {}", e); } - } else if let Err(e) = ch { - error!("bad string from pg: {e:?}"); } } ret @@ -110,14 +117,10 @@ async fn finder_worker( stats: Arc, ) -> Result<(), Error> { // TODO do something with join handle - let (batch_rx, jh_batch) = batchtools::batcher::batch( - SEARCH_BATCH_MAX, - Duration::from_millis(200), - SEARCH_DB_PIPELINE_LEN, - qrx, - ); + let (batch_rx, jh_batch) = + batchtools::batcher::batch(SEARCH_BATCH_MAX, Duration::from_millis(200), SEARCH_DB_WORKER_CNT, qrx); let mut jhs = Vec::new(); - for _ in 0..SEARCH_DB_PIPELINE_LEN { + for _ in 0..SEARCH_DB_WORKER_CNT { let jh = tokio::spawn(finder_worker_single( batch_rx.clone(), tx.clone(), @@ -146,25 +149,37 @@ async fn finder_worker_single( debug!("finder_worker_single make_pg_client"); let (pg, jh) = make_pg_client(&db).await?; let sql = concat!( - "with q1 as (select * from unnest($2::text[]) as unn (ch))", - " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", - " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.archived = 0 and tt.addr is not null", - " order by tt.facility, tt.channel, tsmod desc", + "with q1 as (select * from unnest($2::int[], $3::text[]) as unn (n, ch))", + " select distinct on (q1.n) q1.n, q1.ch, tt.channel, tt.addr, tt.tsmod", + " from q1 left join ioc_by_channel_log tt", + " on tt.channel = q1.ch and tt.facility = $1 and tt.archived = 0 and tt.addr is not null", + " order by q1.n, tsmod desc", ); let qu_select_multi = pg.prepare(sql).await?; - let mut resdiff = 0; loop { match inp.recv().await { Ok(batch) => { - if batch.iter().filter(|x| crate::dbg_chn(x.name())).next().is_some() { - info!("SEARCHING FOR DBG"); - }; + for e in batch.iter().filter(|x| series::dbg::dbg_chn(x.name())) { + info!("searching database for {:?}", e); + } stats.dbsearcher_batch_recv().inc(); stats.dbsearcher_item_recv().add(batch.len() as _); let ts1 = Instant::now(); + let (batch, pass_through) = batch.into_iter().fold((Vec::new(), Vec::new()), |(mut a, mut b), x| { + if x.use_cache() { + a.push(x); + } else { + b.push(x); + } + (a, b) + }); debug_batch!("run query batch len {}", batch.len()); - let names: Vec<_> = batch.iter().filter(|x| x.use_cache()).map(|x| x.name()).collect(); - let qres = pg.query(&qu_select_multi, &[&backend, &names]).await; + let names: Vec<_> = batch + .iter() + .map(|x| if x.use_cache() { x.name() } else { "---------------" }) + .collect(); + let ns: Vec<_> = names.iter().enumerate().map(|(i, _)| i as i32).collect(); + let qres = pg.query(&qu_select_multi, &[&backend, &ns, &names]).await; let dt = ts1.elapsed(); debug_batch!( "done query batch len {}: {} {:.3}ms", @@ -178,57 +193,42 @@ async fn finder_worker_single( match qres { Ok(rows) => { stats.dbsearcher_select_res_0().add(rows.len() as _); - if rows.len() > batch.len() { + if rows.len() != batch.len() { stats.dbsearcher_select_error_len_mismatch().inc(); - } else if rows.len() < batch.len() { - resdiff += batch.len() - rows.len(); + error!("query result len {} batch len {}", rows.len(), batch.len()); + tokio::time::sleep(Duration::from_millis(1000)).await; + continue; } - let nbatch = batch.len(); - trace_batch!("received results {} resdiff {}", rows.len(), resdiff); let items = transform_pgres(rows); - let mut to_add = Vec::new(); - { - let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); - for e in batch { - if !names.contains_key(e.name_string()) { - let item = FindIocRes { - channel: e.name().into(), - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - to_add.push(item); - } + for e in items.iter() { + if series::dbg::dbg_chn(&e.channel) { + info!("found in database {:?}", e); } } let mut items = items; - items.extend(to_add.into_iter()); - let items = items; - for e in &items { - trace!("found in database: {e:?}"); - } - for e in items.iter() { - if crate::dbg_chn(&e.channel) { - info!("FOUND {e:?}"); - } + for e in pass_through { + let x = FindIocRes { + channel: e.name().into(), + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + items.push_back(x); } let items_len = items.len(); - if items_len != nbatch { - stats.dbsearcher_select_error_len_mismatch().inc(); - } match tx.send(items).await { Ok(_) => { stats.dbsearcher_batch_send().inc(); stats.dbsearcher_item_send().add(items_len as _); } Err(e) => { - error!("finder sees: {e}"); + error!("finder sees: {}", e); break; } } } Err(e) => { - error!("finder sees error: {e}"); + error!("finder sees error: {}", e); tokio::time::sleep(Duration::from_millis(1000)).await; } } diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 87b24a2..289fb66 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -97,6 +97,10 @@ pub enum WithStatusSeriesIdStateInner { }, MaybeWrongAddress(MaybeWrongAddressState), UnassigningForConfigChange(UnassigningForConfigChangeState), + AddrSearchPlanned { + #[serde(with = "humantime_serde")] + since: SystemTime, + }, } #[derive(Debug, Clone, Serialize)] @@ -109,7 +113,7 @@ pub struct MaybeWrongAddressState { impl MaybeWrongAddressState { pub fn new(since: SystemTime, backoff_cnt: u32) -> Self { let f = 2. + 60. * (backoff_cnt as f32 / 5.).tanh(); - let dtms = 1e-3 * f; + let dtms = 1e3 * f; Self { since, backoff_dt: Duration::from_millis(dtms as u64), diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index b06007e..cbf3830 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -6,6 +6,7 @@ pub mod linuxhelper; pub mod metrics; pub mod netbuf; pub mod polltimer; +pub mod queueset; pub mod ratelimit; pub mod rt; #[cfg(test)] @@ -21,8 +22,3 @@ pub fn log_test() { debug!("log-test"); trace!("log-test"); } - -pub fn dbg_chn(chn: &str) -> bool { - let chns = ["SINEG01:QE-B1-OP"]; - chns.contains(&chn) -} diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index de6e389..5fffd30 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -148,10 +148,16 @@ fn system_time_epoch(x: &SystemTime) -> bool { *x == SystemTime::UNIX_EPOCH } +#[derive(Debug, Serialize)] +enum Unreachable { + NoAddress, + MaybeWrongAddress, +} + #[derive(Debug, Serialize)] enum ConnectionState { Connecting, - Unreachable, + Unreachable(Unreachable), Disconnected, Connected, Error, @@ -246,19 +252,29 @@ async fn channel_states_try( states.channels.insert(k, chst); } WithStatusSeriesIdStateInner::NoAddress { .. } => { - let chst = - ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable); + let chst = ChannelState::connecting_addr( + st1.config, + None, + ConnectionState::Unreachable(Unreachable::NoAddress), + ); states.channels.insert(k, chst); } WithStatusSeriesIdStateInner::MaybeWrongAddress(..) => { - let chst = - ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable); + let chst = ChannelState::connecting_addr( + st1.config, + None, + ConnectionState::Unreachable(Unreachable::MaybeWrongAddress), + ); states.channels.insert(k, chst); } WithStatusSeriesIdStateInner::UnassigningForConfigChange(_) => { let chst = ChannelState::connecting_addr(st1.config, None, ConnectionState::Connecting); states.channels.insert(k, chst); } + WithStatusSeriesIdStateInner::AddrSearchPlanned { .. } => { + let chst = ChannelState::connecting(st1.config); + states.channels.insert(k, chst); + } } } } diff --git a/netfetch/src/queueset.rs b/netfetch/src/queueset.rs new file mode 100644 index 0000000..8b7c1da --- /dev/null +++ b/netfetch/src/queueset.rs @@ -0,0 +1,37 @@ +use hashbrown::HashSet; +use std::collections::VecDeque; +use std::hash::Hash; + +pub struct QueueSet { + queue: VecDeque, + set: HashSet, +} + +impl QueueSet +where + T: Clone + Eq + Hash, +{ + pub fn new() -> Self { + Self { + queue: VecDeque::new(), + set: HashSet::new(), + } + } + + pub fn push_back(&mut self, e: T) { + if self.set.get(&e).is_some() { + } else { + self.set.insert(e.clone()); + self.queue.push_back(e); + } + } + + pub fn pop_front(&mut self) -> Option { + if let Some(x) = self.queue.pop_front() { + self.set.remove(&x); + Some(x) + } else { + None + } + } +}