diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 667a2f7..a5560ae 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -24,8 +24,8 @@ http = "0.2" url = "2.2" hyper = "0.14" chrono = "0.4" -humantime = "2.1" -humantime-serde = "1.1" +humantime = "2.1.0" +humantime-serde = "1.1.1" pin-project = "1" lazy_static = "1" libc = "0.2" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index f1d1c85..1ddc260 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1598,15 +1598,16 @@ impl CaConn { use Poll::*; match &mut self.state { CaConnState::Unconnected => { - trace4!("Unconnected"); let addr = self.remote_addr_dbg.clone(); + + // TODO issue a TCP-connect event (and later a "connected") trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); + let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr)); self.state = CaConnState::Connecting(addr, Box::pin(fut)); Ok(Ready(Some(()))) } CaConnState::Connecting(ref addr, ref mut fut) => { - trace4!("Connecting"); match fut.poll_unpin(cx) { Ready(connect_result) => { match connect_result { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 08c6c45..cb44e50 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,16 +1,17 @@ use super::conn::ChannelStateInfo; +use super::conn::CheckHealthResult; use super::conn::ConnCommandResult; use super::findioc::FindIocRes; use super::statemap; use super::statemap::ChannelState; +use super::statemap::ConnectionState; +use super::statemap::ConnectionStateValue; use crate::ca::conn::CaConn; use crate::ca::conn::CaConnEvent; use crate::ca::conn::CaConnEventValue; use crate::ca::conn::CaConnOpts; use crate::ca::conn::ConnCommand; use crate::ca::statemap::CaConnState; -use crate::ca::statemap::ConnectionState; -use crate::ca::statemap::ConnectionStateValue; use crate::ca::statemap::WithAddressState; use crate::daemon_common::Channel; use crate::errconv::ErrConv; @@ -31,13 +32,9 @@ use futures_util::Stream; use futures_util::StreamExt; use log::*; use netpod::Database; -use netpod::Shape; -use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; -use series::series::Existence; use series::ChannelStatusSeriesId; -use series::SeriesId; use statemap::ActiveChannelState; use statemap::CaConnStateValue; use statemap::ChannelStateMap; @@ -62,7 +59,6 @@ use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; -const DO_ASSIGN_TO_CA_CONN: bool = true; const CHECK_CHANS_PER_TICK: usize = 10000; pub const SEARCH_BATCH_MAX: usize = 256; pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; @@ -70,13 +66,11 @@ const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000); const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000); +const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(8000); +const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(8000); // TODO put all these into metrics static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_REQ_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0); #[allow(unused)] macro_rules! trace2 { @@ -176,7 +170,6 @@ pub struct ChannelStatusesRequest { #[derive(Debug, Clone, Serialize)] pub struct ChannelStatusesResponse { - pub channels_ca_conn: BTreeMap, pub channels_ca_conn_set: BTreeMap, } @@ -302,7 +295,6 @@ pub struct CaConnSet { local_epics_hostname: String, ca_conn_ress: BTreeMap, channel_states: ChannelStateMap, - ca_conn_channel_states: BTreeMap, connset_inp_rx: Receiver, channel_info_query_queue: VecDeque, channel_info_query_sender: SenderPolling, @@ -330,6 +322,8 @@ pub struct CaConnSet { thr_msg_storage_len: ThrottleTrace, did_connset_out_queue: bool, ca_proto_stats: Arc, + rogue_channel_count: u64, + have_conn_command: bool, } impl CaConnSet { @@ -355,7 +349,6 @@ impl CaConnSet { local_epics_hostname, ca_conn_ress: BTreeMap::new(), channel_states: ChannelStateMap::new(), - ca_conn_channel_states: BTreeMap::new(), connset_inp_rx, channel_info_query_queue: VecDeque::new(), channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()), @@ -384,6 +377,8 @@ impl CaConnSet { thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), did_connset_out_queue: false, ca_proto_stats: ca_proto_stats.clone(), + rogue_channel_count: 0, + have_conn_command: false, }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -474,23 +469,23 @@ impl CaConnSet { Ok(()) } - fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> { - trace3!("handle_add_channel {}", add.name); + fn handle_add_channel(&mut self, cmd: ChannelAdd) -> Result<(), Error> { if self.shutdown_stopping { trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); } + trace3!("handle_add_channel {}", cmd.name); + self.stats.channel_add().inc(); // TODO should I add the transition through ActiveChannelState::Init as well? - let ch = Channel::new(add.name.clone()); + let ch = Channel::new(cmd.name.clone()); let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState { value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { since: SystemTime::now(), }), }); - self.stats.channel_wait_for_status_id.inc(); let item = ChannelInfoQuery { - backend: add.backend, - channel: add.name, + backend: cmd.backend, + channel: cmd.name, scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, shape_dims: Vec::new(), tx: Box::pin(SeriesLookupSender { @@ -501,27 +496,28 @@ impl CaConnSet { Ok(()) } - fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> { - trace3!("handle_add_channel_with_status_id {}", add.name); + fn handle_add_channel_with_status_id(&mut self, cmd: ChannelAddWithStatusId) -> Result<(), Error> { + trace3!("handle_add_channel_with_status_id {}", cmd.name); if self.shutdown_stopping { debug!("handle_add_channel but shutdown_stopping"); return Ok(()); } - let ch = Channel::new(add.name.clone()); + self.stats.channel_status_series_found().inc(); + let ch = Channel::new(cmd.name.clone()); if let Some(chst) = self.channel_states.inner().get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 { *chst2 = ActiveChannelState::WithStatusSeriesId { - status_series_id: add.cssid, + status_series_id: cmd.cssid, state: WithStatusSeriesIdState { inner: WithStatusSeriesIdStateInner::SearchPending { since: SystemTime::now(), }, }, }; - self.stats.channel_wait_for_address.inc(); - let qu = IocAddrQuery { name: add.name }; + let qu = IocAddrQuery { name: cmd.name }; self.find_ioc_query_queue.push_back(qu); + self.stats.ioc_search_start().inc(); } else { warn!("TODO have a status series id but no more channel"); } @@ -534,28 +530,53 @@ impl CaConnSet { Ok(()) } - fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> { + fn handle_add_channel_with_addr(&mut self, cmd: ChannelAddWithAddr) -> Result<(), Error> { if self.shutdown_stopping { trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); } - if !self.ca_conn_ress.contains_key(&add.addr) { - let c = self.create_ca_conn(add.clone())?; - self.ca_conn_ress.insert(add.addr, c); + let addr_v4 = if let SocketAddr::V4(x) = cmd.addr { + x + } else { + return Err(Error::with_msg_no_trace("ipv4 for epics")); + }; + let ch = Channel::new(cmd.name.clone()); + if let Some(chst) = self.channel_states.inner().get_mut(&ch) { + if let ChannelStateValue::Active(ast) = &mut chst.value { + if let ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state: st3, + } = ast + { + let tsnow = SystemTime::now(); + *st3 = WithStatusSeriesIdState { + inner: WithStatusSeriesIdStateInner::WithAddress { + addr: addr_v4, + state: WithAddressState::Assigned(ConnectionState { + updated: tsnow, + value: ConnectionStateValue::Unknown, + }), + }, + }; + if !self.ca_conn_ress.contains_key(&cmd.addr) { + let c = self.create_ca_conn(cmd.clone())?; + self.ca_conn_ress.insert(cmd.addr, c); + } + let conn_ress = self.ca_conn_ress.get_mut(&cmd.addr).unwrap(); + let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid); + conn_ress.cmd_queue.push_back(cmd); + self.have_conn_command = true; + } + } } - let conn_ress = self.ca_conn_ress.get_mut(&add.addr).unwrap(); - let cmd = ConnCommand::channel_add(add.name, add.cssid); - // TODO not the nicest - let tx = conn_ress.sender.clone(); - tokio::spawn(async move { tx.send(cmd).await }); Ok(()) } - fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> { + fn handle_remove_channel(&mut self, cmd: ChannelRemove) -> Result<(), Error> { if self.shutdown_stopping { return Ok(()); } - let ch = Channel::new(add.name); + let ch = Channel::new(cmd.name); if let Some(k) = self.channel_states.inner().get_mut(&ch) { match &k.value { ChannelStateValue::Active(j) => match j { @@ -595,6 +616,7 @@ impl CaConnSet { if self.shutdown_stopping { return Ok(()); } + self.stats.ioc_addr_found().inc(); trace3!("handle_ioc_query_result"); for e in res { let ch = Channel::new(e.channel.clone()); @@ -619,7 +641,6 @@ impl CaConnSet { addr, state: WithAddressState::Unassigned { since }, }; - // TODO move state change also in there? self.handle_add_channel_with_addr(add)?; } else { trace3!("ioc not found {e:?}"); @@ -647,6 +668,8 @@ impl CaConnSet { .trigger("msg", &[&self.storage_insert_sender.len()]); debug!("TODO handle_check_health"); + self.check_channel_states()?; + // Trigger already the next health check, but use the current data that we have. // TODO try to deliver a command to CaConn @@ -657,6 +680,7 @@ impl CaConnSet { let item = ConnCommand::check_health(); res.cmd_queue.push_back(item); } + self.have_conn_command = true; let ts2 = Instant::now(); let item = CaConnSetItem::Healthy(ts1, ts2); @@ -670,23 +694,14 @@ impl CaConnSet { } debug!("handle_channel_statuses_req"); let reg1 = regex::Regex::new(&req.name)?; - let channels_ca_conn = self - .ca_conn_channel_states - .iter() - .filter(|x| reg1.is_match(x.0)) - .map(|(k, v)| (k.to_string(), v.clone())) - .collect(); let channels_ca_conn_set = self .channel_states .inner() .iter() - .filter(|(k, v)| reg1.is_match(k.id())) + .filter(|(k, _)| reg1.is_match(k.id())) .map(|(k, v)| (k.id().to_string(), v.clone())) .collect(); - let item = ChannelStatusesResponse { - channels_ca_conn, - channels_ca_conn_set, - }; + let item = ChannelStatusesResponse { channels_ca_conn_set }; if req.tx.try_send(item).is_err() { self.stats.response_tx_fail.inc(); } @@ -701,7 +716,7 @@ impl CaConnSet { self.shutdown_stopping = true; self.channel_info_query_sender.drop(); self.find_ioc_query_sender.drop(); - for (addr, res) in self.ca_conn_ress.iter() { + for (_addr, res) in self.ca_conn_ress.iter() { let item = ConnCommand::shutdown(); // TODO not the nicest let tx = res.sender.clone(); @@ -713,14 +728,51 @@ impl CaConnSet { fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> { use crate::ca::conn::ConnCommandResultKind::*; match res.kind { - CheckHealth(health) => { - // debug!("handle_conn_command_result {addr}"); - for (k, v) in health.channel_statuses { - self.ca_conn_channel_states.insert(k, v); + CheckHealth(res) => self.apply_ca_conn_health_update(addr, res), + } + } + + fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> { + let tsnow = SystemTime::now(); + self.rogue_channel_count = 0; + for (k, v) in res.channel_statuses { + let ch = Channel::new(k); + if let Some(st1) = self.channel_states.inner().get_mut(&ch) { + if let ChannelStateValue::Active(st2) = &mut st1.value { + if let ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state: st3, + } = st2 + { + if let WithStatusSeriesIdStateInner::WithAddress { + addr: conn_addr, + state: st4, + } = &mut st3.inner + { + if SocketAddr::V4(*conn_addr) != addr { + self.rogue_channel_count += 1; + } + if let WithAddressState::Assigned(st5) = st4 { + st5.updated = tsnow; + st5.value = ConnectionStateValue::ChannelStateInfo(v); + } else { + self.rogue_channel_count += 1; + } + } else { + self.rogue_channel_count += 1; + } + } else { + self.rogue_channel_count += 1; + } + } else { + self.rogue_channel_count += 1; } - Ok(()) + } else { + self.rogue_channel_count += 1; } } + self.stats.channel_rogue.set(self.rogue_channel_count); + Ok(()) } fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> { @@ -733,13 +785,11 @@ impl CaConnSet { warn!("end-of-stream received for non-existent CaConn {addr}"); } self.remove_status_for_addr(addr)?; - debug!("still CaConn left {}", self.ca_conn_ress.len()); + trace2!("still CaConn left {}", self.ca_conn_ress.len()); Ok(()) } fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> { - self.ca_conn_channel_states - .retain(|_k, v| SocketAddr::V4(v.addr) != addr); Ok(()) } @@ -764,7 +814,6 @@ impl CaConnSet { } else { return Err(Error::with_msg_no_trace("only ipv4 for epics")); }; - debug!("create new CaConn {:?}", addr); let conn = CaConn::new( opts, add.backend.clone(), @@ -838,7 +887,7 @@ impl CaConnSet { }, )) .await?; - trace!("ca_conn_consumer signaled {}", addr); + trace2!("ca_conn_consumer signaled {}", addr); ret } @@ -971,8 +1020,10 @@ impl CaConnSet { Ok(()) } - async fn check_channel_states(&mut self) -> Result<(), Error> { + fn check_channel_states(&mut self) -> Result<(), Error> { let (mut search_pending_count,) = self.update_channel_state_counts(); + let mut cmd_remove_channel = Vec::new(); + let mut cmd_add_channel = Vec::new(); let k = self.chan_check_next.take(); let it = if let Some(last) = k { trace!("check_chans start at {:?}", last); @@ -980,13 +1031,14 @@ impl CaConnSet { } else { self.channel_states.inner().range_mut(..) }; + let tsnow = SystemTime::now(); - let mut attempt_series_search = true; for (i, (ch, st)) in it.enumerate() { match &mut st.value { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::Init { since: _ } => { - todo!() + // TODO no longer used? remove? + self.stats.logic_error().inc(); } ActiveChannelState::WaitForStatusSeriesId { since } => { let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); @@ -1013,60 +1065,38 @@ impl CaConnSet { } } WithStatusSeriesIdStateInner::SearchPending { since } => { - //info!("SearchPending {} {:?}", i, ch); let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > SEARCH_PENDING_TIMEOUT { - info!("Search timeout for {ch:?}"); + debug!("TODO should receive some error indication instead of timeout for {ch:?}"); state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow }; search_pending_count -= 1; } } - WithStatusSeriesIdStateInner::WithAddress { addr: addr_v4, state } => { - //info!("WithAddress {} {:?}", i, ch); + WithStatusSeriesIdStateInner::WithAddress { + addr: addr_v4, + state: st3, + } => { use WithAddressState::*; - match state { + match st3 { Unassigned { since } => { - // TODO do I need this case anymore? - #[cfg(DISABLED)] - if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow { - let backend = self.backend.clone(); - let addr = SocketAddr::V4(*addr_v4); - let name = ch.id().into(); - let cssid = status_series_id.clone(); - let local_epics_hostname = self.local_epics_hostname.clone(); - // This operation is meant to complete very quickly - let add = ChannelAdd { - backend: backend, - name: name, - addr, - cssid, - local_epics_hostname, + if *since + CHANNEL_UNASSIGNED_TIMEOUT < tsnow { + let cmd = ChannelAddWithAddr { + backend: self.backend.clone(), + name: ch.id().into(), + local_epics_hostname: self.local_epics_hostname.clone(), + cssid: status_series_id.clone(), + addr: SocketAddr::V4(*addr_v4), }; - self.handle_add_channel(add).await?; - let cs = ConnectionState { - updated: tsnow, - value: ConnectionStateValue::Unconnected, - }; - // TODO if a matching CaConn does not yet exist, it gets created - // via the command through the channel, so we can not await it here. - // Therefore, would be good to have a separate status entry out of - // the ca_conn_ress right here in a sync fashion. - *state = WithAddressState::Assigned(cs); - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: tsnow, - series: SeriesId::new(status_series_id.id()), - status: scywr::iteminsertqueue::ChannelStatus::AssignedToAddress, - }); - match self.storage_insert_tx.send(item).await { - Ok(_) => {} - Err(_) => { - // TODO feed into throttled log, or count as unlogged - } - } + cmd_add_channel.push(cmd); } } - Assigned(_) => { - // TODO check if channel is healthy and alive + Assigned(st4) => { + if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow { + self.stats.channel_health_timeout().inc(); + let addr = SocketAddr::V4(*addr_v4); + cmd_remove_channel.push((addr, ch.clone())); + *st3 = WithAddressState::Unassigned { since: tsnow }; + } } } } @@ -1087,17 +1117,27 @@ impl CaConnSet { break; } } + for (addr, ch) in cmd_remove_channel { + if let Some(g) = self.ca_conn_ress.get_mut(&addr) { + let cmd = ConnCommand::channel_remove(ch.id().into()); + g.cmd_queue.push_back(cmd); + self.have_conn_command = true; + } + } + for cmd in cmd_add_channel { + self.handle_add_channel_with_addr(cmd)?; + } Ok(()) } // TODO should use both counters and values fn update_channel_state_counts(&mut self) -> (u64,) { - return (0,); let mut unknown_address = 0; let mut search_pending = 0; + let mut no_address = 0; let mut unassigned = 0; let mut assigned = 0; - let mut no_address = 0; + let mut connected = 0; for (_ch, st) in self.channel_states.inner().iter() { match &st.value { ChannelStateValue::Active(st2) => match st2 { @@ -1118,9 +1158,14 @@ impl CaConnSet { WithAddressState::Unassigned { .. } => { unassigned += 1; } - WithAddressState::Assigned(_) => { - assigned += 1; - } + WithAddressState::Assigned(st3) => match &st3.value { + ConnectionStateValue::Unknown => { + assigned += 1; + } + ConnectionStateValue::ChannelStateInfo(_) => { + connected += 1; + } + }, }, WithStatusSeriesIdStateInner::NoAddress { .. } => { no_address += 1; @@ -1128,39 +1173,42 @@ impl CaConnSet { }, }, ChannelStateValue::ToRemove { .. } => { - unknown_address += 1; + unassigned += 1; } } } - self.stats.channel_unknown_address.__set(unknown_address); - self.stats.channel_search_pending.__set(search_pending); - self.stats.channel_no_address.__set(no_address); - self.stats.channel_unassigned.__set(unassigned); - self.stats.channel_assigned.__set(assigned); + self.stats.channel_unknown_address.set(unknown_address); + self.stats.channel_search_pending.set(search_pending); + self.stats.channel_no_address.set(no_address); + self.stats.channel_unassigned.set(unassigned); + self.stats.channel_assigned.set(assigned); + self.stats.channel_connected.set(connected); (search_pending,) } fn try_push_ca_conn_cmds(&mut self) { - // debug!("try_push_ca_conn_cmds"); - for (_, v) in self.ca_conn_ress.iter_mut() { - loop { - break if let Some(item) = v.cmd_queue.pop_front() { - match v.sender.try_send(item) { - Ok(()) => continue, - Err(e) => match e { - async_channel::TrySendError::Full(e) => { - self.stats.try_push_ca_conn_cmds_full.inc(); - v.cmd_queue.push_front(e); - break; - } - async_channel::TrySendError::Closed(_) => { - // TODO - self.stats.try_push_ca_conn_cmds_closed.inc(); - break; - } - }, - } - }; + if self.have_conn_command { + self.have_conn_command = false; + for (_, v) in self.ca_conn_ress.iter_mut() { + loop { + break if let Some(item) = v.cmd_queue.pop_front() { + match v.sender.try_send(item) { + Ok(()) => continue, + Err(e) => match e { + async_channel::TrySendError::Full(e) => { + self.stats.try_push_ca_conn_cmds_full.inc(); + v.cmd_queue.push_front(e); + self.have_conn_command = true; + } + async_channel::TrySendError::Closed(_) => { + // TODO + self.stats.try_push_ca_conn_cmds_closed.inc(); + self.have_conn_command = true; + } + }, + } + }; + } } } } @@ -1210,19 +1258,19 @@ impl Stream for CaConnSet { Ready(x) => { let addr = *addr; self.await_ca_conn_jhs.pop_front(); - debug!("await_ca_conn_jhs still jhs left {}", self.await_ca_conn_jhs.len()); + let left = self.await_ca_conn_jhs.len(); match x { Ok(Ok(())) => { self.stats.ca_conn_task_join_done_ok.inc(); - debug!("CaConn {addr} finished well"); + debug!("CaConn {addr} finished well left {left}"); } Ok(Err(e)) => { self.stats.ca_conn_task_join_done_err.inc(); - error!("CaConn {addr} task error: {e}"); + error!("CaConn {addr} task error: {e} left {left}"); } Err(e) => { self.stats.ca_conn_task_join_err.inc(); - error!("CaConn {addr} join error: {e}"); + error!("CaConn {addr} join error: {e} left {left}"); } } have_progress = true; @@ -1233,7 +1281,6 @@ impl Stream for CaConnSet { } } - // TODO should never send from here, track. if self.storage_insert_sender.is_idle() { if let Some(item) = self.storage_insert_queue.pop_front() { self.stats.logic_error().inc(); diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index dff54b8..b5bd8a7 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -1,9 +1,7 @@ +use crate::ca::conn::ChannelStateInfo; use crate::daemon_common::Channel; -use async_channel::Receiver; use serde::Serialize; -use series::series::Existence; use series::ChannelStatusSeriesId; -use series::SeriesId; use std::collections::BTreeMap; use std::net::SocketAddrV4; use std::time::Instant; @@ -35,13 +33,13 @@ impl CaConnState { #[derive(Debug, Clone, Serialize)] pub enum ConnectionStateValue { - Unconnected, - Connected, + Unknown, + ChannelStateInfo(ChannelStateInfo), } #[derive(Debug, Clone, Serialize)] pub struct ConnectionState { - //#[serde(with = "serde_Instant")] + #[serde(with = "humantime_serde")] pub updated: SystemTime, pub value: ConnectionStateValue, } @@ -50,6 +48,7 @@ pub struct ConnectionState { pub enum WithAddressState { Unassigned { //#[serde(with = "serde_Instant")] + #[serde(with = "humantime_serde")] since: SystemTime, }, Assigned(ConnectionState), @@ -58,10 +57,11 @@ pub enum WithAddressState { #[derive(Debug, Clone, Serialize)] pub enum WithStatusSeriesIdStateInner { UnknownAddress { + #[serde(with = "humantime_serde")] since: SystemTime, }, SearchPending { - //#[serde(with = "serde_Instant")] + #[serde(with = "humantime_serde")] since: SystemTime, }, WithAddress { @@ -69,6 +69,7 @@ pub enum WithStatusSeriesIdStateInner { state: WithAddressState, }, NoAddress { + #[serde(with = "humantime_serde")] since: SystemTime, }, } @@ -81,9 +82,11 @@ pub struct WithStatusSeriesIdState { #[derive(Debug, Clone, Serialize)] pub enum ActiveChannelState { Init { + #[serde(with = "humantime_serde")] since: SystemTime, }, WaitForStatusSeriesId { + #[serde(with = "humantime_serde")] since: SystemTime, }, WithStatusSeriesId { diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index b050561..eca5d8a 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -3,6 +3,7 @@ use crate::ca::connset::CaConnSetEvent; use crate::ca::connset::ChannelStatusesRequest; use crate::ca::connset::ChannelStatusesResponse; use crate::ca::connset::ConnSetCmd; +use crate::ca::statemap::ChannelState; use crate::daemon_common::DaemonEvent; use async_channel::Receiver; use async_channel::Sender; @@ -22,6 +23,7 @@ use stats::CaProtoStats; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; +use std::collections::BTreeMap; use std::collections::HashMap; use std::net::SocketAddrV4; use std::sync::atomic::AtomicU64; @@ -134,11 +136,12 @@ async fn channel_state( panic!("TODO"); } -// axum::Json +// ChannelStatusesResponse +// BTreeMap async fn channel_states( params: HashMap, tx: Sender, -) -> axum::Json { +) -> axum::Json> { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); let limit = params .get("limit") @@ -149,17 +152,9 @@ async fn channel_states( let req = ChannelStatusesRequest { name, limit, tx: tx2 }; let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req)); // TODO handle error - tx.send(item).await; + tx.send(item).await.unwrap(); let res = rx2.recv().await.unwrap(); - // match serde_json::to_string(&res) { - // Ok(x) => x, - // Err(e) => { - // error!("Serialize error {e}"); - // Err::<(), _>(e).unwrap(); - // panic!(); - // } - // } - axum::Json(res) + axum::Json(res.channels_ca_conn_set) } async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc) -> axum::Json { diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 32d40d8..f127820 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -5,11 +5,12 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -futures-util = "0.3" +futures-util = "0.3.28" async-channel = "1.9.0" scylla = "0.9.0" -smallvec = "1.11" +smallvec = "1.11.0" pin-project = "1.1.3" +stackfuture = "0.3.0" log = { path = "../log" } stats = { path = "../stats" } series = { path = "../series" } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index d72c152..c4d7d7a 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -17,6 +17,7 @@ use scylla::QueryResult; use series::SeriesId; use smallvec::smallvec; use smallvec::SmallVec; +use stackfuture::StackFuture; use stats::InsertWorkerStats; use std::net::SocketAddrV4; use std::pin::Pin; @@ -289,12 +290,15 @@ where InsertFut::new(scy, qu, params) } +#[pin_project::pin_project] pub struct InsertFut { #[allow(unused)] scy: Arc, #[allow(unused)] qu: Arc, fut: Pin> + Send>>, + // #[pin] + // fut: StackFuture<'static, Result, { 1024 * 3 }>, } impl InsertFut { @@ -308,6 +312,7 @@ impl InsertFut { let fut = scy_ref.execute_paged(qu_ref, params, None); let fut = taskrun::tokio::task::unconstrained(fut); let fut = Box::pin(fut); + // let fut = StackFuture::from(fut); Self { scy, qu, fut } } } @@ -315,8 +320,9 @@ impl InsertFut { impl Future for InsertFut { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.fut.poll_unpin(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + this.fut.poll_unpin(cx) } } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 101d6b9..d51e198 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -216,13 +216,13 @@ stats_proc::stats_struct!(( ), stats_struct( name(CaConnSetStats), + prefix(connset), counters( - channel_unknown_address, - channel_search_pending, - channel_no_address, - channel_with_address, - channel_unassigned, - channel_assigned, + channel_add, + channel_status_series_found, + channel_health_timeout, + ioc_search_start, + ioc_addr_found, ca_conn_task_join_done_ok, ca_conn_task_join_done_err, ca_conn_task_join_err, @@ -231,8 +231,6 @@ stats_proc::stats_struct!(( response_tx_fail, try_push_ca_conn_cmds_full, try_push_ca_conn_cmds_closed, - channel_wait_for_status_id, - channel_wait_for_address, logic_error, ready_for_end_of_stream, ready_for_end_of_stream_with_progress, @@ -241,7 +239,6 @@ stats_proc::stats_struct!(( poll_pending, poll_reloop, poll_no_progress_no_pending, - test_1, ), values( storage_insert_tx_len, @@ -250,6 +247,13 @@ stats_proc::stats_struct!(( channel_info_res_tx_len, find_ioc_query_sender_len, ca_conn_res_tx_len, + channel_unknown_address, + channel_search_pending, + channel_no_address, + channel_unassigned, + channel_assigned, + channel_connected, + channel_rogue, ), ), // agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),