diff --git a/batchtools/src/batcher.rs b/batchtools/src/batcher.rs index b4e35c7..1f853cb 100644 --- a/batchtools/src/batcher.rs +++ b/batchtools/src/batcher.rs @@ -42,6 +42,7 @@ where } }, Err(e) => { + debug!("batcher timeout rx len {}", rx.len()); let _e: tokio::time::error::Elapsed = e; if all.len() > 0 { do_emit = true; diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 27a8fb2..3df9096 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -32,6 +32,7 @@ pub fn main() -> Result<(), Error> { scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await? } SubCmd::ChannelAccess(k) => match k { + #[cfg(DISABLED)] ChannelAccess::CaSearch(k) => { info!("daqingest version {}", clap::crate_version!()); let (conf, channels) = parse_config(k.config.into()).await?; diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index bb45de7..5c06c7a 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -3,7 +3,6 @@ pub mod inserthook; use async_channel::Receiver; use async_channel::Sender; -use async_channel::WeakReceiver; use async_channel::WeakSender; use err::Error; use log::*; @@ -13,7 +12,6 @@ use netfetch::ca::connset::CaConnSetItem; use netfetch::conf::CaIngestOpts; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; -use netfetch::metrics::ExtraInsertsConf; use netfetch::metrics::StatsSet; use netfetch::throttletrace::ThrottleTrace; use netpod::Database; @@ -21,18 +19,10 @@ use netpod::ScyllaConfig; use scywr::insertworker::InsertWorkerOpts; use scywr::insertworker::Ttls; use scywr::iteminsertqueue as scywriiq; -use scywr::store::DataStore; use scywriiq::QueryItem; -use serde::Serialize; -use series::ChannelStatusSeriesId; -use series::SeriesId; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; -use std::collections::BTreeMap; -use std::collections::VecDeque; -use std::net::SocketAddr; -use std::net::SocketAddrV4; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; @@ -43,9 +33,6 @@ use std::time::SystemTime; use taskrun::tokio; use tokio::task::JoinHandle; -const CA_CONN_INSERT_QUEUE_MAX: usize = 256; - -const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); @@ -98,10 +85,11 @@ pub struct Daemon { // TODO should be a stats object? insert_workers_running: AtomicU64, query_item_tx_weak: WeakSender, + connset_health_lat_ema: f32, } impl Daemon { - pub async fn new(opts: DaemonOpts) -> Result { + pub async fn new(opts: DaemonOpts, ingest_opts: CaIngestOpts) -> Result { let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32); let series_by_channel_stats = Arc::new(SeriesByChannelStats::new()); @@ -126,7 +114,7 @@ impl Daemon { opts.local_epics_hostname.clone(), query_item_tx, channel_info_query_tx, - opts.pgconf.clone(), + ingest_opts, ); // TODO remove @@ -230,6 +218,7 @@ impl Daemon { connset_status_last: CheckPeriodic::Waiting(Instant::now()), insert_workers_running: AtomicU64::new(0), query_item_tx_weak, + connset_health_lat_ema: 0., }; Ok(ret) } @@ -242,18 +231,17 @@ impl Daemon { match &self.connset_status_last { CheckPeriodic::Waiting(since) => { if *since + Duration::from_millis(5000) < ts1 { - debug!("======================================== issue health check CaConn"); self.connset_ctrl.check_health().await?; self.connset_status_last = CheckPeriodic::Ongoing(ts1); - if let Some(tx) = self.query_item_tx_weak.upgrade() { - info!("query_item_tx len {}", tx.len()); - } } } CheckPeriodic::Ongoing(since) => { let dt = ts1.saturating_duration_since(*since); if dt > Duration::from_millis(2000) { - error!("======================================== CaConnSet has not reported health status since {:.0}", dt.as_secs_f32() * 1e3); + error!( + "CaConnSet has not reported health status since {:.0}", + dt.as_secs_f32() * 1e3 + ); } } } @@ -289,7 +277,7 @@ impl Daemon { if dt > Duration::from_millis(500) { info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); } - if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL { + if false && tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL { self.last_status_print = tsnow; info!( "{:8} {:8} {:8} : {:8} : {:8} {:8} : {:10}", @@ -382,11 +370,17 @@ impl Daemon { let dt2 = ts3.duration_since(ts2).as_secs_f32() * 1e3; match &self.connset_status_last { CheckPeriodic::Waiting(_since) => { - error!("======================================== received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms"); + error!("received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms"); } CheckPeriodic::Ongoing(since) => { - let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e3; - debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms"); + // TODO insert response time as series to scylla. + let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e6; + { + let v = &mut self.connset_health_lat_ema; + *v += (dtsince - *v) * 0.2; + self.stats.connset_health_lat_ema().set(*v as _); + } + // debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms"); self.connset_status_last = CheckPeriodic::Waiting(ts3); } } @@ -604,7 +598,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> insert_scylla_sessions: opts.insert_scylla_sessions(), insert_frac: insert_frac.clone(), }; - let daemon = Daemon::new(opts2).await?; + let daemon = Daemon::new(opts2, opts.clone()).await?; let tx = daemon.tx.clone(); let daemon_stats = daemon.stats().clone(); let connset_cmd_tx = daemon.connset_ctrl.sender().clone(); @@ -622,6 +616,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> daemon.connset_ctrl.ca_proto_stats().clone(), daemon.insert_worker_stats.clone(), daemon.series_by_channel_stats.clone(), + daemon.connset_ctrl.ioc_finder_stats().clone(), insert_frac, ); let fut = @@ -640,7 +635,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> Ok(()) => {} Err(_) => break, } - thr_msg.trigger("sent ChannelAdd", &[&i as &_]); + thr_msg.trigger("daemon sent ChannelAdd", &[&i as &_]); i += 1; } debug!("{} configured channels applied", channels.len()); diff --git a/daqingest/src/daemon/inserthook.rs b/daqingest/src/daemon/inserthook.rs index 45b8eb2..f7e7786 100644 --- a/daqingest/src/daemon/inserthook.rs +++ b/daqingest/src/daemon/inserthook.rs @@ -31,18 +31,17 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Send Shape::Wave(_) => 1, Shape::Image(_, _) => 2, }; - if let ScalarType::STRING = item.scalar_type { - histo - .entry(item.series.clone()) - .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { - *c += 1; - *msp = item.ts_msp; - *lsp = item.ts_lsp; - *pulse = item.pulse; - // TODO should check that shape_kind stays the same. - }) - .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); - } + if let ScalarType::STRING = item.scalar_type {} + histo + .entry(item.series.clone()) + .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { + *c += 1; + *msp = item.ts_msp; + *lsp = item.ts_lsp; + *pulse = item.pulse; + // TODO should check that shape_kind stays the same. + }) + .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); } _ => {} } diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index ae1c02f..2332ad9 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -79,6 +79,7 @@ pub struct BsreadDump { #[derive(Debug, Parser)] pub enum ChannelAccess { CaIngest(CaConfig), + #[cfg(DISABLED)] CaSearch(CaSearch), } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 1ddc260..53d0e70 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -58,6 +58,8 @@ use std::time::SystemTime; use taskrun::tokio; use tokio::net::TcpStream; +const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000); + #[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => { @@ -144,8 +146,7 @@ struct Cid(pub u32); #[derive(Clone, Debug)] enum ChannelError { - #[allow(unused)] - NoSuccess, + CreateChanFail, } #[derive(Clone, Debug)] @@ -261,8 +262,9 @@ impl ChannelState { } enum CaConnState { - Unconnected, + Unconnected(Instant), Connecting( + Instant, SocketAddrV4, Pin, tokio::time::error::Elapsed>> + Send>>, ), @@ -277,8 +279,8 @@ enum CaConnState { impl fmt::Debug for CaConnState { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match self { - Self::Unconnected => write!(fmt, "Unconnected"), - Self::Connecting(arg0, _) => fmt.debug_tuple("Connecting").field(arg0).finish(), + Self::Unconnected(since) => fmt.debug_tuple("Unconnected").field(since).finish(), + Self::Connecting(since, addr, _) => fmt.debug_tuple("Connecting").field(since).field(addr).finish(), Self::Init => write!(fmt, "Init"), Self::Listen => write!(fmt, "Listen"), Self::PeerReady => write!(fmt, "PeerReady"), @@ -425,7 +427,9 @@ pub enum CaConnEventValue { EchoTimeout, ConnCommandResult(ConnCommandResult), QueryItem(QueryItem), + ChannelCreateFail(String), EndOfStream, + ConnectFail, } #[derive(Debug)] @@ -497,7 +501,6 @@ pub struct CaConn { ioc_ping_last: Instant, ioc_ping_start: Option, storage_insert_sender: SenderPolling, - cmd_res_queue: VecDeque, ca_conn_event_out_queue: VecDeque, channel_info_query_queue: VecDeque, channel_info_query_sending: SenderPolling, @@ -528,7 +531,7 @@ impl CaConn { Self { opts, backend, - state: CaConnState::Unconnected, + state: CaConnState::Unconnected(Instant::now()), ticker: Self::new_self_ticker(), proto: None, cid_store: CidStore::new(), @@ -552,7 +555,6 @@ impl CaConn { ioc_ping_last: Instant::now(), ioc_ping_start: None, storage_insert_sender: SenderPolling::new(storage_insert_tx), - cmd_res_queue: VecDeque::new(), ca_conn_event_out_queue: VecDeque::new(), channel_info_query_queue: VecDeque::new(), channel_info_query_sending: SenderPolling::new(channel_info_query_tx), @@ -581,7 +583,34 @@ impl CaConn { fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { self.state = CaConnState::Shutdown; self.proto = None; + match &channel_reason { + ChannelStatusClosedReason::ShutdownCommand => {} + ChannelStatusClosedReason::ChannelRemove => {} + ChannelStatusClosedReason::ProtocolError => {} + ChannelStatusClosedReason::FrequencyQuota => {} + ChannelStatusClosedReason::BandwidthQuota => {} + ChannelStatusClosedReason::InternalError => {} + ChannelStatusClosedReason::IocTimeout => {} + ChannelStatusClosedReason::NoProtocol => {} + ChannelStatusClosedReason::ProtocolDone => {} + ChannelStatusClosedReason::ConnectFail => { + debug!("emit status ConnectFail"); + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::ConnectFail, + }; + self.ca_conn_event_out_queue.push_back(item); + } + } self.channel_state_on_shutdown(channel_reason); + let addr = self.remote_addr_dbg.clone(); + self.insert_item_queue + .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + ts: SystemTime::now(), + addr, + // TODO map to appropriate status + status: ConnectionStatus::Closing, + })); } fn cmd_check_health(&mut self) { @@ -606,7 +635,11 @@ impl CaConn { id: ConnCommandResult::make_id(), kind: ConnCommandResultKind::CheckHealth(health), }; - self.cmd_res_queue.push_back(res); + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::ConnCommandResult(res), + }; + self.ca_conn_event_out_queue.push_back(item); } fn cmd_find_channel(&self, pattern: &str) { @@ -688,16 +721,17 @@ impl CaConn { trace2!("handle_series_lookup_result {res:?}"); match res { Ok(res) => { - let series = res.series.into_inner(); - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), - series: series.clone(), - status: ChannelStatus::Opened, - }); - self.insert_item_queue.push_back(item); if let Some(cid) = self.cid_by_name.get(&res.channel) { if let Some(chst) = self.channels.get(cid) { if let ChannelState::FetchingSeriesId(st2) = chst { + let cssid = st2.cssid.clone(); + let series = res.series.into_inner(); + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + cssid, + status: ChannelStatus::Opened, + }); + self.insert_item_queue.push_back(item); let cid = st2.cid.clone(); let sid = st2.sid; let data_type = st2.data_type; @@ -805,17 +839,35 @@ impl CaConn { ) } + pub fn channel_remove(&mut self, channel: String) { + Self::channel_remove_expl( + channel, + &mut self.channels, + &mut self.cid_by_name, + &mut self.name_by_cid, + &mut self.cid_store, + &mut self.time_binners, + ) + } + + fn channel_remove_by_cid(&mut self, cid: Cid) { + self.channels.remove(&cid); + self.name_by_cid.remove(&cid); + self.time_binners.remove(&cid); + self.cid_by_name.retain(|_, v| v == &cid); + } + fn channel_remove_expl( - channel: String, + name: String, channels: &mut BTreeMap, cid_by_name: &mut BTreeMap, name_by_cid: &mut BTreeMap, cid_store: &mut CidStore, time_binners: &mut BTreeMap, ) { - let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store); + let cid = Self::cid_by_name_expl(&name, cid_by_name, name_by_cid, cid_store); if channels.contains_key(&cid) { - warn!("TODO actually cause the channel to get closed and removed {}", channel); + warn!("TODO actually cause the channel to get closed and removed {}", name); } { let a: Vec<_> = cid_by_name @@ -833,17 +885,6 @@ impl CaConn { time_binners.remove(&cid); } - pub fn channel_remove(&mut self, channel: String) { - Self::channel_remove_expl( - channel, - &mut self.channels, - &mut self.cid_by_name, - &mut self.name_by_cid, - &mut self.cid_store, - &mut self.time_binners, - ) - } - fn cid_by_name_expl( name: &str, cid_by_name: &mut BTreeMap, @@ -887,10 +928,10 @@ impl CaConn { ChannelState::FetchingSeriesId(..) => { *chst = ChannelState::Ended; } - ChannelState::Created(series, ..) => { + ChannelState::Created(series, st2) => { let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: SystemTime::now(), - series: series.clone(), + cssid: st2.cssid.clone(), status: ChannelStatus::Closed(channel_reason.clone()), }); self.insert_item_queue.push_back(item); @@ -920,7 +961,8 @@ impl CaConn { self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout); } } else { - if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) { + // TODO randomize delay a bit + if self.ioc_ping_last.elapsed() > Duration::from_millis(120000) { if let Some(proto) = &mut self.proto { self.stats.ping_start().inc(); self.ioc_ping_start = Some(Instant::now()); @@ -1470,6 +1512,8 @@ impl CaConn { let cssid = match ch_s { ChannelState::Creating { cssid, .. } => cssid.clone(), _ => { + // TODO handle in better way: + // Remove channel and emit notice that channel is removed with reason. let e = Error::with_msg_no_trace("handle_peer_ready bad state"); return Ready(Some(Err(e))); } @@ -1522,10 +1566,6 @@ impl CaConn { let _ = ts1; res? } - CaMsgTy::Error(e) => { - warn!("channel access error message {e:?}"); - } - CaMsgTy::AccessRightsRes(_) => {} CaMsgTy::Echo => { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { @@ -1553,8 +1593,30 @@ impl CaConn { self.ioc_ping_last = Instant::now(); self.ioc_ping_start = None; } - CaMsgTy::CreateChanFail(_) => { - // TODO handle CreateChanFail + CaMsgTy::CreateChanFail(msg) => { + // TODO + // Here, must indicate that the address could be wrong! + // The channel status must be "Fail" so that ConnSet can decide to re-search. + // TODO how to transition the channel state? Any invariants or simply write to the map? + let cid = Cid(msg.cid); + if let Some(name) = self.name_by_cid.get(&cid) { + debug!("queue event to notive channel create fail {name}"); + let item = CaConnEvent { + ts: tsnow, + value: CaConnEventValue::ChannelCreateFail(name.into()), + }; + self.ca_conn_event_out_queue.push_back(item); + } + self.channel_remove_by_cid(cid); + warn!("CaConn sees: {msg:?}"); + } + CaMsgTy::Error(msg) => { + warn!("CaConn sees: {msg:?}"); + } + CaMsgTy::AccessRightsRes(msg) => { + if false { + warn!("CaConn sees: {msg:?}"); + } } _ => { warn!("Received unexpected protocol message {:?}", k); @@ -1597,17 +1659,17 @@ impl CaConn { fn handle_conn_state(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; match &mut self.state { - CaConnState::Unconnected => { + CaConnState::Unconnected(_since) => { let addr = self.remote_addr_dbg.clone(); // TODO issue a TCP-connect event (and later a "connected") trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr)); - self.state = CaConnState::Connecting(addr, Box::pin(fut)); + self.state = CaConnState::Connecting(Instant::now(), addr, Box::pin(fut)); Ok(Ready(Some(()))) } - CaConnState::Connecting(ref addr, ref mut fut) => { + CaConnState::Connecting(_since, addr, fut) => { match fut.poll_unpin(cx) { Ready(connect_result) => { match connect_result { @@ -1630,33 +1692,44 @@ impl CaConn { self.proto = Some(proto); Ok(Ready(Some(()))) } - Ok(Err(_e)) => { - // TODO log with exponential backoff - let addr = addr.clone(); - self.insert_item_queue - .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: SystemTime::now(), - addr, - status: ConnectionStatus::ConnectError, - })); - let dt = self.backoff_next(); - self.state = CaConnState::Wait(wait_fut(dt)); - self.proto = None; + Ok(Err(e)) => { + trace!("error connect to {addr} {e}"); + if true { + let addr = addr.clone(); + self.insert_item_queue.push_back(QueryItem::ConnectionStatus( + ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::ConnectError, + }, + )); + self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); + } else { + // TODO log with exponential backoff + let dt = self.backoff_next(); + self.state = CaConnState::Wait(wait_fut(dt)); + self.proto = None; + } Ok(Ready(Some(()))) } Err(e) => { // TODO log with exponential backoff - trace!("timeout during connect to {addr:?} {e:?}"); - let addr = addr.clone(); - self.insert_item_queue - .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: SystemTime::now(), - addr, - status: ConnectionStatus::ConnectTimeout, - })); - let dt = self.backoff_next(); - self.state = CaConnState::Wait(wait_fut(dt)); - self.proto = None; + trace!("timeout connect to {addr} {e}"); + if true { + let addr = addr.clone(); + self.insert_item_queue.push_back(QueryItem::ConnectionStatus( + ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::ConnectTimeout, + }, + )); + self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail); + } else { + let dt = self.backoff_next(); + self.state = CaConnState::Wait(wait_fut(dt)); + self.proto = None; + } Ok(Ready(Some(()))) } } @@ -1707,7 +1780,7 @@ impl CaConn { trace4!("Wait"); match inst.poll_unpin(cx) { Ready(_) => { - self.state = CaConnState::Unconnected; + self.state = CaConnState::Unconnected(Instant::now()); self.proto = None; Ok(Ready(Some(()))) } @@ -1784,7 +1857,22 @@ impl CaConn { fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { // debug!("tick CaConn {}", self.remote_addr_dbg); + let tsnow = Instant::now(); let this = self.get_mut(); + match &this.state { + CaConnState::Unconnected(since) => {} + CaConnState::Connecting(since, _addr, _) => { + if *since + CONNECTING_TIMEOUT < tsnow { + debug!("CONNECTION TIMEOUT"); + } + } + CaConnState::Init => {} + CaConnState::Listen => {} + CaConnState::PeerReady => {} + CaConnState::Wait(_) => {} + CaConnState::Shutdown => {} + CaConnState::EndOfStream => {} + } if false { for (_, tb) in this.time_binners.iter_mut() { let iiq = &mut this.insert_item_queue; @@ -1794,10 +1882,18 @@ impl CaConn { Ok(()) } + fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> { + Ok(()) + } + + fn queues_out_flushed(&self) -> bool { + self.queues_async_out_flushed() && self.ca_conn_event_out_queue.is_empty() + } + fn queues_async_out_flushed(&self) -> bool { // self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle() // TODO re-enable later - true + self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle() } fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { @@ -1877,23 +1973,9 @@ impl Stream for CaConn { if let CaConnState::EndOfStream = self.state { break Ready(None); } - if let Some(item) = self.cmd_res_queue.pop_front() { - let item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::ConnCommandResult(item), - }; - break Ready(Some(Ok(item))); - } if let Some(item) = self.ca_conn_event_out_queue.pop_front() { break Ready(Some(Ok(item))); } - // if let Some(item) = self.insert_item_queue.pop_front() { - // let ev = CaConnEvent { - // ts: Instant::now(), - // value: CaConnEventValue::QueryItem(item), - // }; - // break Ready(Some(Ok(ev))); - // } match self.as_mut().handle_own_ticker(cx) { Ok(Ready(())) => { @@ -1954,11 +2036,12 @@ impl Stream for CaConn { } break if self.is_shutdown() { - if self.queues_async_out_flushed() { + if self.queues_out_flushed() { debug!("end of stream {}", self.remote_addr_dbg); self.state = CaConnState::EndOfStream; Ready(None) } else { + debug!("queues_out_flushed false"); if have_progress { self.stats.ca_conn_poll_reloop().inc(); continue; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index cb44e50..59b7aba 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -13,6 +13,7 @@ use crate::ca::conn::CaConnOpts; use crate::ca::conn::ConnCommand; use crate::ca::statemap::CaConnState; use crate::ca::statemap::WithAddressState; +use crate::conf::CaIngestOpts; use crate::daemon_common::Channel; use crate::errconv::ErrConv; use crate::rt::JoinHandle; @@ -32,6 +33,9 @@ use futures_util::Stream; use futures_util::StreamExt; use log::*; use netpod::Database; +use scywr::iteminsertqueue::ChannelInfoItem; +use scywr::iteminsertqueue::ChannelStatus; +use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; use series::ChannelStatusSeriesId; @@ -45,6 +49,7 @@ use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use stats::CaConnSetStats; use stats::CaConnStats; use stats::CaProtoStats; +use stats::IocFinderStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddr; @@ -62,15 +67,13 @@ use taskrun::tokio; const CHECK_CHANS_PER_TICK: usize = 10000; pub const SEARCH_BATCH_MAX: usize = 256; pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; -const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000); +const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(15000); const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); +const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(400); const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); -const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000); -const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(8000); -const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(8000); - -// TODO put all these into metrics -static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0); +const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(15000); +const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000); +const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(15000); #[allow(unused)] macro_rules! trace2 { @@ -209,6 +212,7 @@ pub struct CaConnSetCtrl { stats: Arc, ca_conn_stats: Arc, ca_proto_stats: Arc, + ioc_finder_stats: Arc, jh: JoinHandle>, } @@ -267,11 +271,38 @@ impl CaConnSetCtrl { pub fn ca_proto_stats(&self) -> &Arc { &self.ca_proto_stats } + + pub fn ioc_finder_stats(&self) -> &Arc { + &self.ioc_finder_stats + } } #[derive(Debug)] pub struct IocAddrQuery { - pub name: String, + name: String, + use_cache: bool, +} + +impl IocAddrQuery { + pub fn cached(name: String) -> Self { + Self { name, use_cache: true } + } + + pub fn uncached(name: String) -> Self { + Self { name, use_cache: false } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn name_string(&self) -> &String { + &self.name + } + + pub fn use_cache(&self) -> bool { + self.use_cache + } } struct SeriesLookupSender { @@ -332,14 +363,20 @@ impl CaConnSet { local_epics_hostname: String, storage_insert_tx: Sender, channel_info_query_tx: Sender, - pgconf: Database, + ingest_opts: CaIngestOpts, ) -> CaConnSetCtrl { let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200); let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200); let (connset_out_tx, connset_out_rx) = async_channel::bounded(200); let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(400); - let (find_ioc_query_tx, ioc_finder_jh) = - super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf); + let ioc_finder_stats = Arc::new(IocFinderStats::new()); + let (find_ioc_query_tx, ioc_finder_jh) = super::finder::start_finder( + find_ioc_res_tx.clone(), + backend.clone(), + ingest_opts, + ioc_finder_stats.clone(), + ) + .unwrap(); let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400); let stats = Arc::new(CaConnSetStats::new()); let ca_proto_stats = Arc::new(CaProtoStats::new()); @@ -388,6 +425,7 @@ impl CaConnSet { stats, ca_conn_stats, ca_proto_stats, + ioc_finder_stats, jh, } } @@ -407,6 +445,9 @@ impl CaConnSet { // this.find_ioc_query_tx.receiver_count() // ); debug!("CaConnSet EndOfStream"); + debug!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len()); + this.find_ioc_query_sender.drop(); + debug!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len()); this.ioc_finder_jh .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))??; @@ -443,7 +484,9 @@ impl CaConnSet { self.storage_insert_queue.push_back(item); Ok(()) } + CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x), CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr), + CaConnEventValue::ConnectFail => self.handle_connect_fail(addr), } } @@ -482,6 +525,7 @@ impl CaConnSet { value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { since: SystemTime::now(), }), + running_cmd_id: None, }); let item = ChannelInfoQuery { backend: cmd.backend, @@ -510,12 +554,12 @@ impl CaConnSet { *chst2 = ActiveChannelState::WithStatusSeriesId { status_series_id: cmd.cssid, state: WithStatusSeriesIdState { - inner: WithStatusSeriesIdStateInner::SearchPending { + inner: WithStatusSeriesIdStateInner::AddrSearchPending { since: SystemTime::now(), }, }, }; - let qu = IocAddrQuery { name: cmd.name }; + let qu = IocAddrQuery::cached(cmd.name); self.find_ioc_query_queue.push_back(qu); self.stats.ioc_search_start().inc(); } else { @@ -593,7 +637,7 @@ impl CaConnSet { WithStatusSeriesIdStateInner::UnknownAddress { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } - WithStatusSeriesIdStateInner::SearchPending { .. } => { + WithStatusSeriesIdStateInner::AddrSearchPending { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } => { @@ -604,6 +648,9 @@ impl CaConnSet { WithStatusSeriesIdStateInner::NoAddress { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } + WithStatusSeriesIdStateInner::MaybeWrongAddress { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } }, }, ChannelStateValue::ToRemove { .. } => {} @@ -616,7 +663,6 @@ impl CaConnSet { if self.shutdown_stopping { return Ok(()); } - self.stats.ioc_addr_found().inc(); trace3!("handle_ioc_query_result"); for e in res { let ch = Channel::new(e.channel.clone()); @@ -628,6 +674,7 @@ impl CaConnSet { } = ast { if let Some(addr) = e.addr { + self.stats.ioc_addr_found().inc(); trace3!("ioc found {e:?}"); let add = ChannelAddWithAddr { backend: self.backend.clone(), @@ -643,17 +690,21 @@ impl CaConnSet { }; self.handle_add_channel_with_addr(add)?; } else { + self.stats.ioc_addr_not_found().inc(); trace3!("ioc not found {e:?}"); let since = SystemTime::now(); state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since }; } } else { + self.stats.ioc_addr_result_for_unknown_channel().inc(); warn!("TODO got address but no longer active"); } } else { + self.stats.ioc_addr_result_for_unknown_channel().inc(); warn!("TODO got address but no longer active"); } } else { + self.stats.ioc_addr_result_for_unknown_channel().inc(); warn!("ioc addr lookup done but channel no longer here"); } } @@ -664,10 +715,10 @@ impl CaConnSet { if self.shutdown_stopping { return Ok(()); } - self.thr_msg_storage_len - .trigger("msg", &[&self.storage_insert_sender.len()]); - debug!("TODO handle_check_health"); - + if false { + self.thr_msg_storage_len + .trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]); + } self.check_channel_states()?; // Trigger already the next health check, but use the current data that we have. @@ -714,6 +765,7 @@ impl CaConnSet { } debug!("handle_shutdown"); self.shutdown_stopping = true; + self.find_ioc_res_rx.close(); self.channel_info_query_sender.drop(); self.find_ioc_query_sender.drop(); for (_addr, res) in self.ca_conn_ress.iter() { @@ -775,6 +827,25 @@ impl CaConnSet { Ok(()) } + fn handle_channel_create_fail(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> { + trace!("handle_channel_create_fail {addr} {name}"); + let tsnow = SystemTime::now(); + let ch = Channel::new(name); + if let Some(st1) = self.channel_states.inner().get_mut(&ch) { + if let ChannelStateValue::Active(st2) = &mut st1.value { + if let ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state: st3, + } = st2 + { + trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress"); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow }; + } + } + } + Ok(()) + } + fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { trace2!("handle_ca_conn_eos {addr}"); if let Some(e) = self.ca_conn_ress.remove(&addr) { @@ -789,6 +860,28 @@ impl CaConnSet { Ok(()) } + fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> { + trace2!("handle_connect_fail {addr}"); + let tsnow = SystemTime::now(); + for (ch, st1) in self.channel_states.inner().iter_mut() { + match &mut st1.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { since: _ } => {} + ActiveChannelState::WaitForStatusSeriesId { since: _ } => {} + ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state: st3, + } => { + debug!("connect fail, maybe wrong address for {}", ch.id()); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow }; + } + }, + ChannelStateValue::ToRemove { addr: _ } => {} + } + } + Ok(()) + } + fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> { Ok(()) } @@ -828,7 +921,7 @@ impl CaConnSet { let conn_stats = conn.stats(); let tx1 = self.ca_conn_res_tx.clone(); let tx2 = self.storage_insert_tx.clone(); - let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr)); + let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr, self.stats.clone())); let ca_conn_res = CaConnRes { state: CaConnState::new(CaConnStateValue::Fresh), sender: conn_tx, @@ -844,30 +937,52 @@ impl CaConnSet { tx1: Sender<(SocketAddr, CaConnEvent)>, tx2: Sender, addr: SocketAddr, + stats: Arc, ) -> Result<(), Error> { + stats.ca_conn_task_begin().inc(); trace2!("ca_conn_consumer begin {}", addr); - let stats = conn.stats(); + let connstats = conn.stats(); let mut conn = conn; let mut ret = Ok(()); while let Some(item) = conn.next().await { match item { Ok(item) => { - stats.conn_item_count.inc(); + connstats.conn_item_count.inc(); match item.value { CaConnEventValue::QueryItem(x) => { - tx2.send(x).await; + if let Err(_) = tx2.send(x).await { + break; + } } CaConnEventValue::None => { - tx1.send((addr, item)).await; + if let Err(_) = tx1.send((addr, item)).await { + break; + } } CaConnEventValue::EchoTimeout => { - tx1.send((addr, item)).await; + if let Err(_) = tx1.send((addr, item)).await { + break; + } } CaConnEventValue::ConnCommandResult(_) => { - tx1.send((addr, item)).await; + if let Err(_) = tx1.send((addr, item)).await { + break; + } + } + CaConnEventValue::ChannelCreateFail(_) => { + if let Err(_) = tx1.send((addr, item)).await { + break; + } } CaConnEventValue::EndOfStream => { - tx1.send((addr, item)).await; + if let Err(_) = tx1.send((addr, item)).await { + break; + } + } + CaConnEventValue::ConnectFail => { + if let Err(_) = tx1.send((addr, item)).await { + break; + } } } } @@ -888,9 +1003,25 @@ impl CaConnSet { )) .await?; trace2!("ca_conn_consumer signaled {}", addr); + stats.ca_conn_task_done().inc(); ret } + fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> { + if false { + let _ = ChannelInfoItem { + ts_msp: todo!(), + series: todo!(), + ivl: todo!(), + interest: todo!(), + evsize: todo!(), + }; + } + let item = QueryItem::ChannelStatus(item); + self.storage_insert_queue.push_back(item); + Ok(()) + } + #[allow(unused)] async fn __enqueue_command_to_all(&self, cmdgen: F) -> Result, Error> where @@ -1024,6 +1155,7 @@ impl CaConnSet { let (mut search_pending_count,) = self.update_channel_state_counts(); let mut cmd_remove_channel = Vec::new(); let mut cmd_add_channel = Vec::new(); + let mut channel_status_items = Vec::new(); let k = self.chan_check_next.take(); let it = if let Some(last) = k { trace!("check_chans start at {:?}", last); @@ -1054,17 +1186,17 @@ impl CaConnSet { state, } => match &mut state.inner { WithStatusSeriesIdStateInner::UnknownAddress { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > UNKNOWN_ADDRESS_STAY { - //info!("UnknownAddress {} {:?}", i, ch); - if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX { - search_pending_count += 1; - state.inner = WithStatusSeriesIdStateInner::SearchPending { since: tsnow }; - SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel); + if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { + if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < tsnow { + if false { + // TODO + search_pending_count += 1; + state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow }; + } } } } - WithStatusSeriesIdStateInner::SearchPending { since } => { + WithStatusSeriesIdStateInner::AddrSearchPending { since } => { let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > SEARCH_PENDING_TIMEOUT { debug!("TODO should receive some error indication instead of timeout for {ch:?}"); @@ -1093,19 +1225,36 @@ impl CaConnSet { Assigned(st4) => { if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow { self.stats.channel_health_timeout().inc(); + debug!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); let addr = SocketAddr::V4(*addr_v4); cmd_remove_channel.push((addr, ch.clone())); - *st3 = WithAddressState::Unassigned { since: tsnow }; + state.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow }; + // *st3 = WithAddressState::Unassigned { since: tsnow }; + let item = + ChannelStatusItem::new_closed_conn_timeout(tsnow, status_series_id.clone()); + channel_status_items.push(item); } } } } WithStatusSeriesIdStateInner::NoAddress { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > NO_ADDRESS_STAY { + if *since + NO_ADDRESS_STAY < tsnow { state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow }; } } + WithStatusSeriesIdStateInner::MaybeWrongAddress { since } => { + if *since + MAYBE_WRONG_ADDRESS_STAY < tsnow { + if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { + if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < tsnow { + search_pending_count += 1; + state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow }; + let qu = IocAddrQuery::uncached(ch.id().into()); + self.find_ioc_query_queue.push_back(qu); + self.stats.ioc_search_start().inc(); + } + } + } + } }, }, ChannelStateValue::ToRemove { .. } => { @@ -1117,6 +1266,9 @@ impl CaConnSet { break; } } + for item in channel_status_items { + self.push_channel_status(item)?; + } for (addr, ch) in cmd_remove_channel { if let Some(g) = self.ca_conn_ress.get_mut(&addr) { let cmd = ConnCommand::channel_remove(ch.id().into()); @@ -1138,6 +1290,7 @@ impl CaConnSet { let mut unassigned = 0; let mut assigned = 0; let mut connected = 0; + let mut maybe_wrong_address = 0; for (_ch, st) in self.channel_states.inner().iter() { match &st.value { ChannelStateValue::Active(st2) => match st2 { @@ -1151,7 +1304,7 @@ impl CaConnSet { WithStatusSeriesIdStateInner::UnknownAddress { .. } => { unknown_address += 1; } - WithStatusSeriesIdStateInner::SearchPending { .. } => { + WithStatusSeriesIdStateInner::AddrSearchPending { .. } => { search_pending += 1; } WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { @@ -1170,6 +1323,9 @@ impl CaConnSet { WithStatusSeriesIdStateInner::NoAddress { .. } => { no_address += 1; } + WithStatusSeriesIdStateInner::MaybeWrongAddress { .. } => { + maybe_wrong_address += 1; + } }, }, ChannelStateValue::ToRemove { .. } => { @@ -1183,6 +1339,7 @@ impl CaConnSet { self.stats.channel_unassigned.set(unassigned); self.stats.channel_assigned.set(assigned); self.stats.channel_connected.set(connected); + self.stats.channel_maybe_wrong_address.set(maybe_wrong_address); (search_pending,) } @@ -1222,9 +1379,11 @@ impl Stream for CaConnSet { self.stats.poll_fn_begin().inc(); loop { self.stats.poll_loop_begin().inc(); - self.thr_msg_poll_1.trigger("CaConnSet::poll", &[]); self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _); + self.stats + .storage_insert_queue_len + .set(self.storage_insert_queue.len() as _); self.stats .channel_info_query_queue_len .set(self.channel_info_query_queue.len() as _); diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index b56ce84..9a68cd7 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -2,23 +2,29 @@ use super::connset::CaConnSetEvent; use super::connset::IocAddrQuery; use super::connset::CURRENT_SEARCH_PENDING_MAX; use super::connset::SEARCH_BATCH_MAX; +use super::search::ca_search_workers_start; use crate::ca::findioc::FindIocRes; use crate::ca::findioc::FindIocStream; +use crate::conf::CaIngestOpts; use crate::daemon_common::DaemonEvent; use async_channel::Receiver; use async_channel::Sender; use dbpg::conn::make_pg_client; +use dbpg::iocindex::IocItem; +use dbpg::iocindex::IocSearchIndexWorker; use dbpg::postgres::Row as PgRow; use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; use netpod::Database; +use stats::IocFinderStats; use std::collections::HashMap; use std::collections::VecDeque; use std::net::SocketAddrV4; use std::sync::atomic; use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use taskrun::tokio; @@ -30,16 +36,8 @@ const FINDER_BATCH_SIZE: usize = 8; const FINDER_IN_FLIGHT_MAX: usize = 800; const FINDER_TIMEOUT: Duration = Duration::from_millis(100); -// TODO pull out into a stats -static SEARCH_REQ_BATCH_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_0_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_1_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0); - #[allow(unused)] macro_rules! debug_batch { - // (D$($arg:tt)*) => (); ($($arg:tt)*) => (if false { debug!($($arg)*); }); @@ -47,7 +45,6 @@ macro_rules! debug_batch { #[allow(unused)] macro_rules! trace_batch { - // (D$($arg:tt)*) => (); ($($arg:tt)*) => (if false { trace!($($arg)*); }); @@ -83,11 +80,50 @@ fn transform_pgres(rows: Vec) -> VecDeque { ret } +pub fn start_finder( + tx: Sender>, + backend: String, + opts: CaIngestOpts, + stats: Arc, +) -> Result<(Sender, JoinHandle>), Error> { + let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); + let jh = taskrun::spawn(finder_full(qrx, tx, backend, opts, stats)); + Ok((qtx, jh)) +} + +async fn finder_worker( + qrx: Receiver, + tx: Sender>, + backend: String, + db: Database, + stats: Arc, +) -> Result<(), Error> { + // TODO do something with join handle + let (batch_rx, jh) = batchtools::batcher::batch( + SEARCH_BATCH_MAX, + Duration::from_millis(200), + SEARCH_DB_PIPELINE_LEN, + qrx, + ); + for _ in 0..SEARCH_DB_PIPELINE_LEN { + // TODO use join handle + tokio::spawn(finder_worker_single( + batch_rx.clone(), + tx.clone(), + backend.clone(), + db.clone(), + stats.clone(), + )); + } + Ok(()) +} + async fn finder_worker_single( inp: Receiver>, tx: Sender>, backend: String, db: Database, + stats: Arc, ) -> Result<(), Error> { let (pg, jh) = make_pg_client(&db) .await @@ -106,10 +142,11 @@ async fn finder_worker_single( loop { match inp.recv().await { Ok(batch) => { - SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); + stats.dbsearcher_batch_recv().inc(); + stats.dbsearcher_item_recv().add(batch.len() as _); let ts1 = Instant::now(); debug_batch!("run query batch len {}", batch.len()); - let names: Vec<_> = batch.iter().map(|x| x.name.as_str()).collect(); + let names: Vec<_> = batch.iter().filter(|x| x.use_cache()).map(|x| x.name()).collect(); let qres = pg.query(&qu_select_multi, &[&backend, &names]).await; let dt = ts1.elapsed(); debug_batch!( @@ -125,30 +162,29 @@ async fn finder_worker_single( out.push_str(", "); } out.push('\''); - out.push_str(&e.name); + out.push_str(e.name()); out.push('\''); } out.push(']'); - eprintln!("VERY SLOW QUERY\n{out}"); + trace!("very slow query\n{out}"); } match qres { Ok(rows) => { + stats.dbsearcher_select_res_0().add(rows.len() as _); if rows.len() > batch.len() { - error!("MORE RESULTS THAN INPUT"); + stats.dbsearcher_select_error_len_mismatch().inc(); } else if rows.len() < batch.len() { resdiff += batch.len() - rows.len(); } let nbatch = batch.len(); trace_batch!("received results {} resdiff {}", rows.len(), resdiff); - SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); let items = transform_pgres(rows); let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); let mut to_add = Vec::new(); for e in batch { - let s = e.name; - if !names.contains_key(&s) { + if !names.contains_key(e.name_string()) { let item = FindIocRes { - channel: s, + channel: e.name().into(), response_addr: None, addr: None, dt: Duration::from_millis(0), @@ -156,17 +192,17 @@ async fn finder_worker_single( to_add.push(item); } } - SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); let mut items = items; items.extend(to_add.into_iter()); - if items.len() != nbatch { - error!("STILL NOT MATCHING LEN"); + let items_len = items.len(); + if items_len != nbatch { + stats.dbsearcher_select_error_len_mismatch().inc(); } - SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - let x = tx.send(items).await; - match x { - Ok(_) => {} + match tx.send(items).await { + Ok(_) => { + stats.dbsearcher_batch_send().inc(); + stats.dbsearcher_item_send().add(items_len as _); + } Err(e) => { error!("finder sees: {e}"); break; @@ -182,76 +218,110 @@ async fn finder_worker_single( Err(_e) => break, } } + debug!("finder_worker_single done"); + jh.await?.map_err(|e| Error::from_string(e))?; Ok(()) } -async fn finder_worker( +async fn finder_full( qrx: Receiver, tx: Sender>, backend: String, - db: Database, + opts: CaIngestOpts, + stats: Arc, ) -> Result<(), Error> { - // TODO do something with join handle - let (batch_rx, _jh) = batchtools::batcher::batch( - SEARCH_BATCH_MAX, - Duration::from_millis(200), - SEARCH_DB_PIPELINE_LEN, + let (tx1, rx1) = async_channel::bounded(20); + let jh1 = taskrun::spawn(finder_worker( qrx, - ); - for _ in 0..SEARCH_DB_PIPELINE_LEN { - // TODO use join handle - tokio::spawn(finder_worker_single( - batch_rx.clone(), - tx.clone(), - backend.clone(), - db.clone(), - )); + tx1, + backend, + opts.postgresql_config().clone(), + stats.clone(), + )); + let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone(), stats)); + jh1.await??; + jh2.await??; + Ok(()) +} + +async fn finder_network_if_not_found( + mut rx: Receiver>, + tx: Sender>, + opts: CaIngestOpts, + stats: Arc, +) -> Result<(), Error> { + let (net_tx, net_rx, jh, jhs) = ca_search_workers_start(&opts, stats.clone()).await.unwrap(); + let jh2 = taskrun::spawn(process_net_result(net_rx, tx.clone(), opts.clone())); + 'outer: while let Some(item) = rx.next().await { + let mut res = VecDeque::new(); + let mut net = VecDeque::new(); + for e in item { + if e.addr.is_none() { + net.push_back(e.channel); + } else { + res.push_back(e); + } + } + if let Err(_) = tx.send(res).await { + break; + } + for ch in net { + if let Err(_) = net_tx.send(ch).await { + break 'outer; + } + } + } + for jh in jhs { + jh.await??; + } + jh.await??; + jh2.await??; + debug!("finder_network_if_not_found done"); + Ok(()) +} + +async fn process_net_result( + mut net_rx: Receiver, Error>>, + tx: Sender>, + opts: CaIngestOpts, +) -> Result<(), Error> { + const IOC_SEARCH_INDEX_WORKER_COUNT: usize = 1; + let (dbtx, dbrx) = async_channel::bounded(64); + let mut ioc_search_index_worker_jhs = Vec::new(); + let mut index_worker_pg_jh = Vec::new(); + for _ in 0..IOC_SEARCH_INDEX_WORKER_COUNT { + let backend = opts.backend().into(); + let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()).await.unwrap(); + index_worker_pg_jh.push(jh); + let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg).await.unwrap(); + let jh = tokio::spawn(async move { worker.worker().await }); + ioc_search_index_worker_jhs.push(jh); + } + drop(dbrx); + while let Some(item) = net_rx.next().await { + match item { + Ok(item) => { + for e in item.iter() { + let cacheitem = + IocItem::new(e.channel.clone(), e.response_addr.clone(), e.addr.clone(), e.dt.clone()); + if let Err(_) = dbtx.send(cacheitem).await { + break; + } + } + if let Err(_) = tx.send(item).await { + break; + } + } + Err(e) => { + warn!("error during network search: {e}"); + break; + } + } } Ok(()) } -pub fn start_finder( - tx: Sender>, - backend: String, - db: Database, -) -> (Sender, JoinHandle>) { - let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); - let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db)); - (qtx, jh) -} - -struct OptFut { - fut: Option, -} - -impl OptFut { - fn empty() -> Self { - Self { fut: None } - } - - fn new(fut: F) -> Self { - Self { fut: Some(fut) } - } - - fn is_enabled(&self) -> bool { - self.fut.is_some() - } -} - -impl futures_util::Future for OptFut -where - F: futures_util::Future + std::marker::Unpin, -{ - type Output = ::Output; - - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { - match self.fut.as_mut() { - Some(fut) => fut.poll_unpin(cx), - None => std::task::Poll::Pending, - } - } -} - +#[cfg(DISABLED)] #[allow(unused)] fn start_finder_ca(tx: Sender, tgts: Vec) -> (Sender, JoinHandle<()>) { let (qtx, qrx) = async_channel::bounded(32); diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index d8468ee..4717957 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -1,12 +1,16 @@ use crate::ca::proto::CaMsg; use crate::ca::proto::CaMsgTy; use crate::ca::proto::HeadInfo; +use crate::throttletrace::ThrottleTrace; +use async_channel::Receiver; use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::StreamExt; use libc::c_int; use log::*; +use stats::IocFinderStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::Ipv4Addr; @@ -14,6 +18,7 @@ use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -21,6 +26,15 @@ use std::time::Instant; use taskrun::tokio; use tokio::io::unix::AsyncFd; +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + struct SockBox(c_int); impl Drop for SockBox { @@ -36,14 +50,26 @@ impl Drop for SockBox { // TODO should be able to get away with non-atomic counters. static BATCH_ID: AtomicUsize = AtomicUsize::new(0); -static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0); +static SEARCH_ID: AtomicUsize = AtomicUsize::new(0); #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] struct BatchId(u32); +impl BatchId { + fn next() -> Self { + Self(BATCH_ID.fetch_add(1, Ordering::AcqRel) as u32) + } +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] struct SearchId(u32); +impl SearchId { + fn next() -> Self { + Self(SEARCH_ID.fetch_add(1, Ordering::AcqRel) as u32) + } +} + struct SearchBatch { ts_beg: Instant, tgts: VecDeque, @@ -62,7 +88,7 @@ pub struct FindIocRes { pub struct FindIocStream { tgts: Vec, - channels_input: VecDeque, + channels_input: Receiver, in_flight: BTreeMap, in_flight_max: usize, bid_by_sid: BTreeMap, @@ -80,16 +106,30 @@ pub struct FindIocStream { sids_done: BTreeMap, result_for_done_sid_count: u64, sleeper: Pin + Send>>, - stop_on_empty_queue: bool, + #[allow(unused)] + thr_msg_0: ThrottleTrace, + #[allow(unused)] + thr_msg_1: ThrottleTrace, + #[allow(unused)] + thr_msg_2: ThrottleTrace, + stats: Arc, } impl FindIocStream { - pub fn new(tgts: Vec, batch_run_max: Duration, in_flight_max: usize, batch_size: usize) -> Self { + pub fn new( + channels_input: Receiver, + tgts: Vec, + #[allow(unused)] blacklist: Vec, + batch_run_max: Duration, + in_flight_max: usize, + batch_size: usize, + stats: Arc, + ) -> Self { let sock = unsafe { Self::create_socket() }.unwrap(); let afd = AsyncFd::new(sock.0).unwrap(); Self { tgts, - channels_input: VecDeque::new(), + channels_input, in_flight: BTreeMap::new(), bid_by_sid: BTreeMap::new(), batch_send_queue: VecDeque::new(), @@ -107,14 +147,13 @@ impl FindIocStream { channels_per_batch: batch_size, batch_run_max, sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))), - stop_on_empty_queue: false, + thr_msg_0: ThrottleTrace::new(Duration::from_millis(1000)), + thr_msg_1: ThrottleTrace::new(Duration::from_millis(1000)), + thr_msg_2: ThrottleTrace::new(Duration::from_millis(1000)), + stats, } } - pub fn set_stop_on_empty_queue(&mut self) { - self.stop_on_empty_queue = true; - } - pub fn quick_state(&self) -> String { format!( "channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}", @@ -131,10 +170,6 @@ impl FindIocStream { self.channels_input.len() } - pub fn push(&mut self, x: String) { - self.channels_input.push_back(x); - } - fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec, &mut SearchBatch)> { match self.in_flight.get_mut(bid) { Some(batch) => Some((&mut self.buf1, batch)), @@ -235,7 +270,10 @@ impl FindIocStream { Poll::Ready(Ok(())) } - unsafe fn try_read(sock: i32) -> Poll), Error>> { + unsafe fn try_read( + sock: i32, + stats: &IocFinderStats, + ) -> Poll), Error>> { let mut saddr_mem = [0u8; std::mem::size_of::()]; let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; let mut buf = vec![0u8; 1024]; @@ -255,11 +293,14 @@ impl FindIocStream { return Poll::Ready(Err("FindIocStream can not read".into())); } } else if ec < 0 { + stats.ca_udp_io_error().inc(); error!("unexpected received {ec}"); Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) } else if ec == 0 { + stats.ca_udp_io_empty().inc(); Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) } else { + stats.ca_udp_io_recv().inc(); let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); let src_port = u16::from_be(saddr2.sin_port); @@ -308,13 +349,16 @@ impl FindIocStream { accounted += 16 + hi.payload(); } if accounted != ec as usize { - info!("unaccounted data ec {} accounted {}", ec, accounted); + stats.ca_udp_unaccounted_data().inc(); + debug!("unaccounted data ec {} accounted {}", ec, accounted); } if msgs.len() < 1 { - warn!("received answer without messages"); + stats.ca_udp_warn().inc(); + debug!("received answer without messages"); } if msgs.len() == 1 { - warn!("received answer with single message: {msgs:?}"); + stats.ca_udp_warn().inc(); + debug!("received answer with single message: {msgs:?}"); } let mut good = true; if let CaMsgTy::VersionRes(v) = msgs[0].ty { @@ -323,7 +367,8 @@ impl FindIocStream { good = false; } } else { - debug!("first message is not a version: {:?}", msgs[0].ty); + stats.ca_udp_first_msg_not_version().inc(); + // debug!("first message is not a version: {:?}", msgs[0].ty); // Seems like a bug in many IOCs //good = false; } @@ -337,6 +382,7 @@ impl FindIocStream { } //CaMsgTy::VersionRes(13) => {} _ => { + stats.ca_udp_error().inc(); warn!("try_read: unknown message received {:?}", msg.ty); } } @@ -365,15 +411,15 @@ impl FindIocStream { } } - fn create_in_flight(&mut self) { - let bid = BatchId(BATCH_ID.fetch_add(1, Ordering::AcqRel) as u32); + fn create_in_flight(&mut self, chns: Vec) { + let bid = BatchId::next(); let mut sids = Vec::new(); let mut chs = Vec::new(); - while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 { - let sid = SearchId(SEARCH_ID2.fetch_add(1, Ordering::AcqRel) as u32); + for ch in chns { + let sid = SearchId::next(); self.bid_by_sid.insert(sid.clone(), bid.clone()); sids.push(sid); - chs.push(self.channels_input.pop_front().unwrap()); + chs.push(ch); } let n = chs.len(); let batch = SearchBatch { @@ -385,6 +431,7 @@ impl FindIocStream { }; self.in_flight.insert(bid.clone(), batch); self.batch_send_queue.push_back(bid); + self.stats.ca_udp_batch_created().inc(); } fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) { @@ -411,6 +458,7 @@ impl FindIocStream { addr: Some(addr), dt, }; + trace!("udp search response {res:?}"); self.out_queue.push_back(res); } None => { @@ -488,6 +536,28 @@ impl FindIocStream { self.in_flight.remove(&bid); } } + + fn get_input_up_to_batch_max(&mut self, cx: &mut Context) -> Vec { + use Poll::*; + let mut ret = Vec::new(); + loop { + break match self.channels_input.poll_next_unpin(cx) { + Ready(Some(item)) => { + ret.push(item); + if ret.len() < self.channels_per_batch { + continue; + } + } + Ready(None) => {} + Pending => {} + }; + } + ret + } + + fn ready_for_end_of_stream(&self) -> bool { + self.channels_input.is_closed() && self.in_flight.is_empty() && self.out_queue.is_empty() + } } impl Stream for FindIocStream { @@ -495,16 +565,17 @@ impl Stream for FindIocStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + // self.thr_msg_0.trigger("FindIocStream::poll_next", &[]); match self.ping.poll_unpin(cx) { Ready(_) => { self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200))); - cx.waker().wake_by_ref(); + let _ = self.ping.poll_unpin(cx); } Pending => {} } self.clear_timed_out(); loop { - let mut loop_again = false; + let mut have_progress = false; if self.out_queue.is_empty() == false { let ret = std::mem::replace(&mut self.out_queue, VecDeque::new()); break Ready(Some(Ok(ret))); @@ -514,7 +585,7 @@ impl Stream for FindIocStream { Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } { Ready(Ok(())) => { self.buf1.clear(); - loop_again = true; + have_progress = true; } Ready(Err(e)) => { error!("{e:?}"); @@ -522,7 +593,7 @@ impl Stream for FindIocStream { Pending => { g.clear_ready(); warn!("socket seemed ready for write, but is not"); - loop_again = true; + have_progress = true; } }, Ready(Err(e)) => { @@ -546,19 +617,19 @@ impl Stream for FindIocStream { //info!("Serialize and queue {bid:?}"); self.send_addr = tgt.clone(); self.batch_send_queue.push_back(bid); - loop_again = true; + have_progress = true; } None => { self.buf1.clear(); self.batch_send_queue.push_back(bid); - loop_again = true; + have_progress = true; error!("tgtix does not exist"); } } } None => { //info!("Batch exhausted"); - loop_again = true; + have_progress = true; } } } @@ -569,19 +640,37 @@ impl Stream for FindIocStream { } else { warn!("bid {bid:?} from batch send queue not in flight NOT done"); } - loop_again = true; + have_progress = true; } } } None => break, } } - while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max { - self.create_in_flight(); - loop_again = true; + if !self.channels_input.is_closed() { + while self.in_flight.len() < self.in_flight_max { + #[cfg(DISABLED)] + { + let n1 = self.in_flight.len(); + self.thr_msg_1.trigger("FindIocStream while A {}", &[&n1]); + } + let chns = self.get_input_up_to_batch_max(cx); + if chns.len() == 0 { + break; + } else { + #[cfg(DISABLED)] + { + let n1 = self.in_flight.len(); + let n2 = chns.len(); + self.thr_msg_2.trigger("FindIocStream while B {} {}", &[&n1, &n2]); + } + self.create_in_flight(chns); + have_progress = true; + } + } } break match self.afd.poll_read_ready(cx) { - Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } { + Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0, &self.stats) } { Ready(Ok((src, res))) => { self.handle_result(src, res); continue; @@ -601,20 +690,16 @@ impl Stream for FindIocStream { Ready(Some(Err(e))) } Pending => { - if loop_again { + if have_progress { continue; } else { - if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() { - if self.stop_on_empty_queue { - Ready(None) - } else { - match self.sleeper.poll_unpin(cx) { - Ready(_) => { - self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500))); - continue; - } - Pending => Pending, + if self.ready_for_end_of_stream() { + match self.sleeper.poll_unpin(cx) { + Ready(_) => { + self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500))); + continue; } + Pending => Pending, } } else { Pending diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 6aa7ef4..d74537a 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -1,3 +1,4 @@ +use super::findioc::FindIocRes; use crate::ca::findioc::FindIocStream; use crate::conf::CaIngestOpts; use async_channel::Receiver; @@ -8,15 +9,16 @@ use dbpg::iocindex::IocSearchIndexWorker; use err::Error; use futures_util::StreamExt; use log::*; +use stats::IocFinderStats; +use std::collections::VecDeque; use std::net::IpAddr; use std::net::SocketAddr; +use std::net::SocketAddrV4; +use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use taskrun::tokio; use tokio::task::JoinHandle; -const DB_WORKER_COUNT: usize = 4; - async fn resolve_address(addr_str: &str) -> Result { const PORT_DEFAULT: u16 = 5064; let ac = match addr_str.parse::() { @@ -68,6 +70,7 @@ impl DbUpdateWorker { } } +#[cfg(DISABLED)] pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { info!("ca_search begin"); let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) @@ -76,51 +79,17 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), dbpg::schema::schema_check(&pg) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let mut addrs = Vec::new(); - for s in opts.search() { - match resolve_address(s).await { - Ok(addr) => { - trace!("resolved {s} as {addr}"); - addrs.push(addr); - } - Err(e) => { - error!("can not resolve {s} {e}"); - } - } - } - let gw_addrs = { - let mut gw_addrs = Vec::new(); - for s in opts.search_blacklist() { - match resolve_address(s).await { - Ok(addr) => { - trace!("resolved {s} as {addr}"); - gw_addrs.push(addr); - } - Err(e) => { - warn!("can not resolve {s} {e}"); - } - } - } - gw_addrs - }; - let addrs = addrs - .into_iter() - .filter_map(|x| match x { - SocketAddr::V4(x) => Some(x), - SocketAddr::V6(_) => { - error!("TODO check ipv6 support for IOCs"); - None - } - }) - .collect(); - let mut finder = FindIocStream::new(addrs, Duration::from_millis(800), 20, 4); - finder.set_stop_on_empty_queue(); - for ch in channels.iter() { - finder.push(ch.into()); - } + let (search_tgts, blacklist) = search_tgts_from_opts(&opts).await?; + + // let mut finder = FindIocStream::new(search_tgts, Duration::from_millis(800), 20, 16); + // finder.set_stop_on_empty_queue(); + // for ch in channels.iter() { + // finder.push(ch.into()); + // } + + const DB_WORKER_COUNT: usize = 1; let (dbtx, dbrx) = async_channel::bounded(64); - let mut dbworkers = Vec::new(); for _ in 0..DB_WORKER_COUNT { let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) @@ -201,3 +170,80 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), info!("all done"); Ok(()) } + +pub async fn ca_search_workers_start( + opts: &CaIngestOpts, + stats: Arc, +) -> Result< + ( + Sender, + Receiver, Error>>, + JoinHandle>, + Vec>>, + ), + Error, +> { + let (search_tgts, blacklist) = search_tgts_from_opts(&opts).await?; + let batch_run_max = Duration::from_millis(800); + let (inp_tx, inp_rx) = async_channel::bounded(256); + let (out_tx, out_rx) = async_channel::bounded(256); + let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16, stats); + let jh = taskrun::spawn(finder_run(finder, out_tx)); + let jhs = Vec::new(); + Ok((inp_tx, out_rx, jh, jhs)) +} + +async fn search_tgts_from_opts(opts: &CaIngestOpts) -> Result<(Vec, Vec), Error> { + let mut addrs = Vec::new(); + for s in opts.search() { + match resolve_address(s).await { + Ok(addr) => { + trace!("resolved {s} as {addr}"); + match addr { + SocketAddr::V4(addr) => { + addrs.push(addr); + } + SocketAddr::V6(_) => { + error!("no ipv6 for epics"); + } + } + } + Err(e) => { + error!("can not resolve {s} {e}"); + } + } + } + let blacklist = { + let mut addrs = Vec::new(); + for s in opts.search_blacklist() { + match resolve_address(s).await { + Ok(addr) => { + trace!("resolved {s} as {addr}"); + match addr { + SocketAddr::V4(addr) => { + addrs.push(addr); + } + SocketAddr::V6(_) => { + error!("no ipv6 for epics"); + } + } + } + Err(e) => { + warn!("can not resolve {s} {e}"); + } + } + } + addrs + }; + Ok((addrs, blacklist)) +} + +async fn finder_run(mut finder: FindIocStream, tx: Sender, Error>>) -> Result<(), Error> { + while let Some(item) = finder.next().await { + if let Err(_) = tx.send(item).await { + break; + } + } + debug!("finder_run done"); + Ok(()) +} diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index b5bd8a7..1954ac7 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -7,7 +7,7 @@ use std::net::SocketAddrV4; use std::time::Instant; use std::time::SystemTime; -pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1; +pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14; #[derive(Debug)] pub enum CaConnStateValue { @@ -60,7 +60,7 @@ pub enum WithStatusSeriesIdStateInner { #[serde(with = "humantime_serde")] since: SystemTime, }, - SearchPending { + AddrSearchPending { #[serde(with = "humantime_serde")] since: SystemTime, }, @@ -72,6 +72,10 @@ pub enum WithStatusSeriesIdStateInner { #[serde(with = "humantime_serde")] since: SystemTime, }, + MaybeWrongAddress { + #[serde(with = "humantime_serde")] + since: SystemTime, + }, } #[derive(Debug, Clone, Serialize)] @@ -104,6 +108,7 @@ pub enum ChannelStateValue { #[derive(Debug, Clone, Serialize)] pub struct ChannelState { pub value: ChannelStateValue, + pub running_cmd_id: Option, } #[derive(Debug, Clone, Serialize)] diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index eca5d8a..a7ea0a1 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -22,6 +22,7 @@ use stats::CaConnStatsAggDiff; use stats::CaProtoStats; use stats::DaemonStats; use stats::InsertWorkerStats; +use stats::IocFinderStats; use stats::SeriesByChannelStats; use std::collections::BTreeMap; use std::collections::HashMap; @@ -39,6 +40,7 @@ pub struct StatsSet { ca_proto: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, + ioc_finder_stats: Arc, insert_frac: Arc, } @@ -50,6 +52,7 @@ impl StatsSet { ca_proto: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, + ioc_finder_stats: Arc, insert_frac: Arc, ) -> Self { Self { @@ -59,6 +62,7 @@ impl StatsSet { ca_proto, insert_worker_stats, series_by_channel_stats, + ioc_finder_stats, insert_frac, } } @@ -208,14 +212,14 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st get({ // || async move { - debug!("metrics"); - let mut s1 = stats_set.daemon.prometheus(); + let s1 = stats_set.daemon.prometheus(); let s2 = stats_set.ca_conn_set.prometheus(); let s3 = stats_set.insert_worker_stats.prometheus(); let s4 = stats_set.ca_conn.prometheus(); let s5 = stats_set.series_by_channel_stats.prometheus(); let s6 = stats_set.ca_proto.prometheus(); - [s1, s2, s3, s4, s5, s6].join("") + let s7 = stats_set.ioc_finder_stats.prometheus(); + [s1, s2, s3, s4, s5, s6, s7].join("") } }), ) diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 2568524..c6fe7d3 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -72,8 +72,8 @@ impl SenderPolling { } pub fn drop(&mut self) { - self.sender = None; self.fut = None; + self.sender = None; } pub fn len(&self) -> Option { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index c4d7d7a..dccefc0 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -14,6 +14,7 @@ use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use scylla::QueryResult; +use series::ChannelStatusSeriesId; use series::SeriesId; use smallvec::smallvec; use smallvec::SmallVec; @@ -126,6 +127,7 @@ pub enum ChannelStatusClosedReason { IocTimeout, NoProtocol, ProtocolDone, + ConnectFail, } #[derive(Debug)] @@ -152,6 +154,7 @@ impl ChannelStatus { IocTimeout => 8, NoProtocol => 9, ProtocolDone => 10, + ConnectFail => 11, }, } } @@ -170,6 +173,7 @@ impl ChannelStatus { 8 => Closed(IocTimeout), 9 => Closed(NoProtocol), 10 => Closed(ProtocolDone), + 11 => Closed(ConnectFail), 24 => AssignedToAddress, _ => { return Err(err::Error::with_msg_no_trace(format!( @@ -184,10 +188,20 @@ impl ChannelStatus { #[derive(Debug)] pub struct ChannelStatusItem { pub ts: SystemTime, - pub series: SeriesId, + pub cssid: ChannelStatusSeriesId, pub status: ChannelStatus, } +impl ChannelStatusItem { + pub fn new_closed_conn_timeout(ts: SystemTime, cssid: ChannelStatusSeriesId) -> Self { + Self { + ts, + cssid, + status: ChannelStatus::Closed(ChannelStatusClosedReason::IocTimeout), + } + } +} + #[derive(Debug)] pub struct InsertItem { pub series: SeriesId, @@ -565,9 +579,9 @@ pub fn insert_channel_status_fut( let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; let kind = item.status.to_kind(); - let series = item.series.id(); + let cssid = item.cssid.id(); let params = ( - series as i64, + cssid as i64, ts_msp as i64, ts_lsp as i64, kind as i32, @@ -581,7 +595,7 @@ pub fn insert_channel_status_fut( let params = ( ts_msp as i64, ts_lsp as i64, - series as i64, + cssid as i64, kind as i32, ttls.index.as_secs() as i32, ); @@ -626,9 +640,9 @@ pub async fn insert_channel_status( let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; let kind = item.status.to_kind(); - let series = item.series.id(); + let cssid = item.cssid.id(); let params = ( - series as i64, + cssid as i64, ts_msp as i64, ts_lsp as i64, kind as i32, @@ -641,7 +655,7 @@ pub async fn insert_channel_status( let params = ( ts_msp as i64, ts_lsp as i64, - series as i64, + cssid as i64, kind as i32, ttl.as_secs() as i32, ); diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 5fae62f..3a2a988 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -275,7 +275,7 @@ impl EvTabDim1 { format!("events_array_{}", self.sty) } - fn cql(&self) -> String { + fn cql_create(&self) -> String { use std::fmt::Write; let mut s = String::new(); let ttl = self.default_time_to_live.as_secs(); @@ -332,8 +332,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { default_time_to_live: dhours(1), compaction_window_size: dhours(12), }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; + if !has_table(&desc.name(), scy).await? { + scy.query(desc.cql_create(), ()).await?; } } Ok(()) diff --git a/stats/src/stats.rs b/stats/src/stats.rs index d51e198..6a34a42 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -223,6 +223,10 @@ stats_proc::stats_struct!(( channel_health_timeout, ioc_search_start, ioc_addr_found, + ioc_addr_not_found, + ioc_addr_result_for_unknown_channel, + ca_conn_task_begin, + ca_conn_task_done, ca_conn_task_join_done_ok, ca_conn_task_join_done_err, ca_conn_task_join_err, @@ -241,6 +245,7 @@ stats_proc::stats_struct!(( poll_no_progress_no_pending, ), values( + storage_insert_queue_len, storage_insert_tx_len, channel_info_query_queue_len, channel_info_query_sender_len, @@ -253,6 +258,7 @@ stats_proc::stats_struct!(( channel_unassigned, channel_assigned, channel_connected, + channel_maybe_wrong_address, channel_rogue, ), ), @@ -304,6 +310,27 @@ stats_proc::stats_struct!(( worker_finish, ) ), + stats_struct( + name(IocFinderStats), + prefix(ioc_finder), + counters( + dbsearcher_batch_recv, + dbsearcher_item_recv, + dbsearcher_select_res_0, + dbsearcher_select_error_len_mismatch, + dbsearcher_batch_send, + dbsearcher_item_send, + ca_udp_error, + ca_udp_warn, + ca_udp_unaccounted_data, + ca_udp_batch_created, + ca_udp_io_error, + ca_udp_io_empty, + ca_udp_io_recv, + ca_udp_first_msg_not_version, + ), + values(db_lookup_workers,) + ), )); // #[cfg(DISABLED)] @@ -417,7 +444,8 @@ stats_proc::stats_struct!(( channel_unknown_address, channel_search_pending, channel_with_address, - channel_no_address + channel_no_address, + connset_health_lat_ema, ), ), agg(name(DaemonStatsAgg), parent(DaemonStats)),