Readd on timeout

This commit is contained in:
Dominik Werder
2023-09-20 15:07:48 +02:00
parent 5fa77acf5c
commit 38ea01e724
8 changed files with 235 additions and 178 deletions

View File

@@ -24,8 +24,8 @@ http = "0.2"
url = "2.2"
hyper = "0.14"
chrono = "0.4"
humantime = "2.1"
humantime-serde = "1.1"
humantime = "2.1.0"
humantime-serde = "1.1.1"
pin-project = "1"
lazy_static = "1"
libc = "0.2"

View File

@@ -1598,15 +1598,16 @@ impl CaConn {
use Poll::*;
match &mut self.state {
CaConnState::Unconnected => {
trace4!("Unconnected");
let addr = self.remote_addr_dbg.clone();
// TODO issue a TCP-connect event (and later a "connected")
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr));
self.state = CaConnState::Connecting(addr, Box::pin(fut));
Ok(Ready(Some(())))
}
CaConnState::Connecting(ref addr, ref mut fut) => {
trace4!("Connecting");
match fut.poll_unpin(cx) {
Ready(connect_result) => {
match connect_result {

View File

@@ -1,16 +1,17 @@
use super::conn::ChannelStateInfo;
use super::conn::CheckHealthResult;
use super::conn::ConnCommandResult;
use super::findioc::FindIocRes;
use super::statemap;
use super::statemap::ChannelState;
use super::statemap::ConnectionState;
use super::statemap::ConnectionStateValue;
use crate::ca::conn::CaConn;
use crate::ca::conn::CaConnEvent;
use crate::ca::conn::CaConnEventValue;
use crate::ca::conn::CaConnOpts;
use crate::ca::conn::ConnCommand;
use crate::ca::statemap::CaConnState;
use crate::ca::statemap::ConnectionState;
use crate::ca::statemap::ConnectionStateValue;
use crate::ca::statemap::WithAddressState;
use crate::daemon_common::Channel;
use crate::errconv::ErrConv;
@@ -31,13 +32,9 @@ use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use netpod::Shape;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use statemap::ActiveChannelState;
use statemap::CaConnStateValue;
use statemap::ChannelStateMap;
@@ -62,7 +59,6 @@ use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
const DO_ASSIGN_TO_CA_CONN: bool = true;
const CHECK_CHANS_PER_TICK: usize = 10000;
pub const SEARCH_BATCH_MAX: usize = 256;
pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4;
@@ -70,13 +66,11 @@ const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000);
const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000);
const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000);
const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(8000);
const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(8000);
// TODO put all these into metrics
static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_SEND_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0);
#[allow(unused)]
macro_rules! trace2 {
@@ -176,7 +170,6 @@ pub struct ChannelStatusesRequest {
#[derive(Debug, Clone, Serialize)]
pub struct ChannelStatusesResponse {
pub channels_ca_conn: BTreeMap<String, ChannelStateInfo>,
pub channels_ca_conn_set: BTreeMap<String, ChannelState>,
}
@@ -302,7 +295,6 @@ pub struct CaConnSet {
local_epics_hostname: String,
ca_conn_ress: BTreeMap<SocketAddr, CaConnRes>,
channel_states: ChannelStateMap,
ca_conn_channel_states: BTreeMap<String, ChannelStateInfo>,
connset_inp_rx: Receiver<CaConnSetEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sender: SenderPolling<ChannelInfoQuery>,
@@ -330,6 +322,8 @@ pub struct CaConnSet {
thr_msg_storage_len: ThrottleTrace,
did_connset_out_queue: bool,
ca_proto_stats: Arc<CaProtoStats>,
rogue_channel_count: u64,
have_conn_command: bool,
}
impl CaConnSet {
@@ -355,7 +349,6 @@ impl CaConnSet {
local_epics_hostname,
ca_conn_ress: BTreeMap::new(),
channel_states: ChannelStateMap::new(),
ca_conn_channel_states: BTreeMap::new(),
connset_inp_rx,
channel_info_query_queue: VecDeque::new(),
channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()),
@@ -384,6 +377,8 @@ impl CaConnSet {
thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)),
did_connset_out_queue: false,
ca_proto_stats: ca_proto_stats.clone(),
rogue_channel_count: 0,
have_conn_command: false,
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -474,23 +469,23 @@ impl CaConnSet {
Ok(())
}
fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> {
trace3!("handle_add_channel {}", add.name);
fn handle_add_channel(&mut self, cmd: ChannelAdd) -> Result<(), Error> {
if self.shutdown_stopping {
trace3!("handle_add_channel but shutdown_stopping");
return Ok(());
}
trace3!("handle_add_channel {}", cmd.name);
self.stats.channel_add().inc();
// TODO should I add the transition through ActiveChannelState::Init as well?
let ch = Channel::new(add.name.clone());
let ch = Channel::new(cmd.name.clone());
let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
since: SystemTime::now(),
}),
});
self.stats.channel_wait_for_status_id.inc();
let item = ChannelInfoQuery {
backend: add.backend,
channel: add.name,
backend: cmd.backend,
channel: cmd.name,
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: Vec::new(),
tx: Box::pin(SeriesLookupSender {
@@ -501,27 +496,28 @@ impl CaConnSet {
Ok(())
}
fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> {
trace3!("handle_add_channel_with_status_id {}", add.name);
fn handle_add_channel_with_status_id(&mut self, cmd: ChannelAddWithStatusId) -> Result<(), Error> {
trace3!("handle_add_channel_with_status_id {}", cmd.name);
if self.shutdown_stopping {
debug!("handle_add_channel but shutdown_stopping");
return Ok(());
}
let ch = Channel::new(add.name.clone());
self.stats.channel_status_series_found().inc();
let ch = Channel::new(cmd.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 {
*chst2 = ActiveChannelState::WithStatusSeriesId {
status_series_id: add.cssid,
status_series_id: cmd.cssid,
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::SearchPending {
since: SystemTime::now(),
},
},
};
self.stats.channel_wait_for_address.inc();
let qu = IocAddrQuery { name: add.name };
let qu = IocAddrQuery { name: cmd.name };
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
} else {
warn!("TODO have a status series id but no more channel");
}
@@ -534,28 +530,53 @@ impl CaConnSet {
Ok(())
}
fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> {
fn handle_add_channel_with_addr(&mut self, cmd: ChannelAddWithAddr) -> Result<(), Error> {
if self.shutdown_stopping {
trace3!("handle_add_channel but shutdown_stopping");
return Ok(());
}
if !self.ca_conn_ress.contains_key(&add.addr) {
let c = self.create_ca_conn(add.clone())?;
self.ca_conn_ress.insert(add.addr, c);
let addr_v4 = if let SocketAddr::V4(x) = cmd.addr {
x
} else {
return Err(Error::with_msg_no_trace("ipv4 for epics"));
};
let ch = Channel::new(cmd.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} = ast
{
let tsnow = SystemTime::now();
*st3 = WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::WithAddress {
addr: addr_v4,
state: WithAddressState::Assigned(ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unknown,
}),
},
};
if !self.ca_conn_ress.contains_key(&cmd.addr) {
let c = self.create_ca_conn(cmd.clone())?;
self.ca_conn_ress.insert(cmd.addr, c);
}
let conn_ress = self.ca_conn_ress.get_mut(&cmd.addr).unwrap();
let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid);
conn_ress.cmd_queue.push_back(cmd);
self.have_conn_command = true;
}
}
}
let conn_ress = self.ca_conn_ress.get_mut(&add.addr).unwrap();
let cmd = ConnCommand::channel_add(add.name, add.cssid);
// TODO not the nicest
let tx = conn_ress.sender.clone();
tokio::spawn(async move { tx.send(cmd).await });
Ok(())
}
fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> {
fn handle_remove_channel(&mut self, cmd: ChannelRemove) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
let ch = Channel::new(add.name);
let ch = Channel::new(cmd.name);
if let Some(k) = self.channel_states.inner().get_mut(&ch) {
match &k.value {
ChannelStateValue::Active(j) => match j {
@@ -595,6 +616,7 @@ impl CaConnSet {
if self.shutdown_stopping {
return Ok(());
}
self.stats.ioc_addr_found().inc();
trace3!("handle_ioc_query_result");
for e in res {
let ch = Channel::new(e.channel.clone());
@@ -619,7 +641,6 @@ impl CaConnSet {
addr,
state: WithAddressState::Unassigned { since },
};
// TODO move state change also in there?
self.handle_add_channel_with_addr(add)?;
} else {
trace3!("ioc not found {e:?}");
@@ -647,6 +668,8 @@ impl CaConnSet {
.trigger("msg", &[&self.storage_insert_sender.len()]);
debug!("TODO handle_check_health");
self.check_channel_states()?;
// Trigger already the next health check, but use the current data that we have.
// TODO try to deliver a command to CaConn
@@ -657,6 +680,7 @@ impl CaConnSet {
let item = ConnCommand::check_health();
res.cmd_queue.push_back(item);
}
self.have_conn_command = true;
let ts2 = Instant::now();
let item = CaConnSetItem::Healthy(ts1, ts2);
@@ -670,23 +694,14 @@ impl CaConnSet {
}
debug!("handle_channel_statuses_req");
let reg1 = regex::Regex::new(&req.name)?;
let channels_ca_conn = self
.ca_conn_channel_states
.iter()
.filter(|x| reg1.is_match(x.0))
.map(|(k, v)| (k.to_string(), v.clone()))
.collect();
let channels_ca_conn_set = self
.channel_states
.inner()
.iter()
.filter(|(k, v)| reg1.is_match(k.id()))
.filter(|(k, _)| reg1.is_match(k.id()))
.map(|(k, v)| (k.id().to_string(), v.clone()))
.collect();
let item = ChannelStatusesResponse {
channels_ca_conn,
channels_ca_conn_set,
};
let item = ChannelStatusesResponse { channels_ca_conn_set };
if req.tx.try_send(item).is_err() {
self.stats.response_tx_fail.inc();
}
@@ -701,7 +716,7 @@ impl CaConnSet {
self.shutdown_stopping = true;
self.channel_info_query_sender.drop();
self.find_ioc_query_sender.drop();
for (addr, res) in self.ca_conn_ress.iter() {
for (_addr, res) in self.ca_conn_ress.iter() {
let item = ConnCommand::shutdown();
// TODO not the nicest
let tx = res.sender.clone();
@@ -713,14 +728,51 @@ impl CaConnSet {
fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> {
use crate::ca::conn::ConnCommandResultKind::*;
match res.kind {
CheckHealth(health) => {
// debug!("handle_conn_command_result {addr}");
for (k, v) in health.channel_statuses {
self.ca_conn_channel_states.insert(k, v);
CheckHealth(res) => self.apply_ca_conn_health_update(addr, res),
}
}
fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> {
let tsnow = SystemTime::now();
self.rogue_channel_count = 0;
for (k, v) in res.channel_statuses {
let ch = Channel::new(k);
if let Some(st1) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} = st2
{
if let WithStatusSeriesIdStateInner::WithAddress {
addr: conn_addr,
state: st4,
} = &mut st3.inner
{
if SocketAddr::V4(*conn_addr) != addr {
self.rogue_channel_count += 1;
}
if let WithAddressState::Assigned(st5) = st4 {
st5.updated = tsnow;
st5.value = ConnectionStateValue::ChannelStateInfo(v);
} else {
self.rogue_channel_count += 1;
}
} else {
self.rogue_channel_count += 1;
}
} else {
self.rogue_channel_count += 1;
}
} else {
self.rogue_channel_count += 1;
}
Ok(())
} else {
self.rogue_channel_count += 1;
}
}
self.stats.channel_rogue.set(self.rogue_channel_count);
Ok(())
}
fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
@@ -733,13 +785,11 @@ impl CaConnSet {
warn!("end-of-stream received for non-existent CaConn {addr}");
}
self.remove_status_for_addr(addr)?;
debug!("still CaConn left {}", self.ca_conn_ress.len());
trace2!("still CaConn left {}", self.ca_conn_ress.len());
Ok(())
}
fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> {
self.ca_conn_channel_states
.retain(|_k, v| SocketAddr::V4(v.addr) != addr);
Ok(())
}
@@ -764,7 +814,6 @@ impl CaConnSet {
} else {
return Err(Error::with_msg_no_trace("only ipv4 for epics"));
};
debug!("create new CaConn {:?}", addr);
let conn = CaConn::new(
opts,
add.backend.clone(),
@@ -838,7 +887,7 @@ impl CaConnSet {
},
))
.await?;
trace!("ca_conn_consumer signaled {}", addr);
trace2!("ca_conn_consumer signaled {}", addr);
ret
}
@@ -971,8 +1020,10 @@ impl CaConnSet {
Ok(())
}
async fn check_channel_states(&mut self) -> Result<(), Error> {
fn check_channel_states(&mut self) -> Result<(), Error> {
let (mut search_pending_count,) = self.update_channel_state_counts();
let mut cmd_remove_channel = Vec::new();
let mut cmd_add_channel = Vec::new();
let k = self.chan_check_next.take();
let it = if let Some(last) = k {
trace!("check_chans start at {:?}", last);
@@ -980,13 +1031,14 @@ impl CaConnSet {
} else {
self.channel_states.inner().range_mut(..)
};
let tsnow = SystemTime::now();
let mut attempt_series_search = true;
for (i, (ch, st)) in it.enumerate() {
match &mut st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { since: _ } => {
todo!()
// TODO no longer used? remove?
self.stats.logic_error().inc();
}
ActiveChannelState::WaitForStatusSeriesId { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
@@ -1013,60 +1065,38 @@ impl CaConnSet {
}
}
WithStatusSeriesIdStateInner::SearchPending { since } => {
//info!("SearchPending {} {:?}", i, ch);
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
info!("Search timeout for {ch:?}");
debug!("TODO should receive some error indication instead of timeout for {ch:?}");
state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow };
search_pending_count -= 1;
}
}
WithStatusSeriesIdStateInner::WithAddress { addr: addr_v4, state } => {
//info!("WithAddress {} {:?}", i, ch);
WithStatusSeriesIdStateInner::WithAddress {
addr: addr_v4,
state: st3,
} => {
use WithAddressState::*;
match state {
match st3 {
Unassigned { since } => {
// TODO do I need this case anymore?
#[cfg(DISABLED)]
if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow {
let backend = self.backend.clone();
let addr = SocketAddr::V4(*addr_v4);
let name = ch.id().into();
let cssid = status_series_id.clone();
let local_epics_hostname = self.local_epics_hostname.clone();
// This operation is meant to complete very quickly
let add = ChannelAdd {
backend: backend,
name: name,
addr,
cssid,
local_epics_hostname,
if *since + CHANNEL_UNASSIGNED_TIMEOUT < tsnow {
let cmd = ChannelAddWithAddr {
backend: self.backend.clone(),
name: ch.id().into(),
local_epics_hostname: self.local_epics_hostname.clone(),
cssid: status_series_id.clone(),
addr: SocketAddr::V4(*addr_v4),
};
self.handle_add_channel(add).await?;
let cs = ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unconnected,
};
// TODO if a matching CaConn does not yet exist, it gets created
// via the command through the channel, so we can not await it here.
// Therefore, would be good to have a separate status entry out of
// the ca_conn_ress right here in a sync fashion.
*state = WithAddressState::Assigned(cs);
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: tsnow,
series: SeriesId::new(status_series_id.id()),
status: scywr::iteminsertqueue::ChannelStatus::AssignedToAddress,
});
match self.storage_insert_tx.send(item).await {
Ok(_) => {}
Err(_) => {
// TODO feed into throttled log, or count as unlogged
}
}
cmd_add_channel.push(cmd);
}
}
Assigned(_) => {
// TODO check if channel is healthy and alive
Assigned(st4) => {
if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow {
self.stats.channel_health_timeout().inc();
let addr = SocketAddr::V4(*addr_v4);
cmd_remove_channel.push((addr, ch.clone()));
*st3 = WithAddressState::Unassigned { since: tsnow };
}
}
}
}
@@ -1087,17 +1117,27 @@ impl CaConnSet {
break;
}
}
for (addr, ch) in cmd_remove_channel {
if let Some(g) = self.ca_conn_ress.get_mut(&addr) {
let cmd = ConnCommand::channel_remove(ch.id().into());
g.cmd_queue.push_back(cmd);
self.have_conn_command = true;
}
}
for cmd in cmd_add_channel {
self.handle_add_channel_with_addr(cmd)?;
}
Ok(())
}
// TODO should use both counters and values
fn update_channel_state_counts(&mut self) -> (u64,) {
return (0,);
let mut unknown_address = 0;
let mut search_pending = 0;
let mut no_address = 0;
let mut unassigned = 0;
let mut assigned = 0;
let mut no_address = 0;
let mut connected = 0;
for (_ch, st) in self.channel_states.inner().iter() {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
@@ -1118,9 +1158,14 @@ impl CaConnSet {
WithAddressState::Unassigned { .. } => {
unassigned += 1;
}
WithAddressState::Assigned(_) => {
assigned += 1;
}
WithAddressState::Assigned(st3) => match &st3.value {
ConnectionStateValue::Unknown => {
assigned += 1;
}
ConnectionStateValue::ChannelStateInfo(_) => {
connected += 1;
}
},
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
no_address += 1;
@@ -1128,39 +1173,42 @@ impl CaConnSet {
},
},
ChannelStateValue::ToRemove { .. } => {
unknown_address += 1;
unassigned += 1;
}
}
}
self.stats.channel_unknown_address.__set(unknown_address);
self.stats.channel_search_pending.__set(search_pending);
self.stats.channel_no_address.__set(no_address);
self.stats.channel_unassigned.__set(unassigned);
self.stats.channel_assigned.__set(assigned);
self.stats.channel_unknown_address.set(unknown_address);
self.stats.channel_search_pending.set(search_pending);
self.stats.channel_no_address.set(no_address);
self.stats.channel_unassigned.set(unassigned);
self.stats.channel_assigned.set(assigned);
self.stats.channel_connected.set(connected);
(search_pending,)
}
fn try_push_ca_conn_cmds(&mut self) {
// debug!("try_push_ca_conn_cmds");
for (_, v) in self.ca_conn_ress.iter_mut() {
loop {
break if let Some(item) = v.cmd_queue.pop_front() {
match v.sender.try_send(item) {
Ok(()) => continue,
Err(e) => match e {
async_channel::TrySendError::Full(e) => {
self.stats.try_push_ca_conn_cmds_full.inc();
v.cmd_queue.push_front(e);
break;
}
async_channel::TrySendError::Closed(_) => {
// TODO
self.stats.try_push_ca_conn_cmds_closed.inc();
break;
}
},
}
};
if self.have_conn_command {
self.have_conn_command = false;
for (_, v) in self.ca_conn_ress.iter_mut() {
loop {
break if let Some(item) = v.cmd_queue.pop_front() {
match v.sender.try_send(item) {
Ok(()) => continue,
Err(e) => match e {
async_channel::TrySendError::Full(e) => {
self.stats.try_push_ca_conn_cmds_full.inc();
v.cmd_queue.push_front(e);
self.have_conn_command = true;
}
async_channel::TrySendError::Closed(_) => {
// TODO
self.stats.try_push_ca_conn_cmds_closed.inc();
self.have_conn_command = true;
}
},
}
};
}
}
}
}
@@ -1210,19 +1258,19 @@ impl Stream for CaConnSet {
Ready(x) => {
let addr = *addr;
self.await_ca_conn_jhs.pop_front();
debug!("await_ca_conn_jhs still jhs left {}", self.await_ca_conn_jhs.len());
let left = self.await_ca_conn_jhs.len();
match x {
Ok(Ok(())) => {
self.stats.ca_conn_task_join_done_ok.inc();
debug!("CaConn {addr} finished well");
debug!("CaConn {addr} finished well left {left}");
}
Ok(Err(e)) => {
self.stats.ca_conn_task_join_done_err.inc();
error!("CaConn {addr} task error: {e}");
error!("CaConn {addr} task error: {e} left {left}");
}
Err(e) => {
self.stats.ca_conn_task_join_err.inc();
error!("CaConn {addr} join error: {e}");
error!("CaConn {addr} join error: {e} left {left}");
}
}
have_progress = true;
@@ -1233,7 +1281,6 @@ impl Stream for CaConnSet {
}
}
// TODO should never send from here, track.
if self.storage_insert_sender.is_idle() {
if let Some(item) = self.storage_insert_queue.pop_front() {
self.stats.logic_error().inc();

View File

@@ -1,9 +1,7 @@
use crate::ca::conn::ChannelStateInfo;
use crate::daemon_common::Channel;
use async_channel::Receiver;
use serde::Serialize;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::collections::BTreeMap;
use std::net::SocketAddrV4;
use std::time::Instant;
@@ -35,13 +33,13 @@ impl CaConnState {
#[derive(Debug, Clone, Serialize)]
pub enum ConnectionStateValue {
Unconnected,
Connected,
Unknown,
ChannelStateInfo(ChannelStateInfo),
}
#[derive(Debug, Clone, Serialize)]
pub struct ConnectionState {
//#[serde(with = "serde_Instant")]
#[serde(with = "humantime_serde")]
pub updated: SystemTime,
pub value: ConnectionStateValue,
}
@@ -50,6 +48,7 @@ pub struct ConnectionState {
pub enum WithAddressState {
Unassigned {
//#[serde(with = "serde_Instant")]
#[serde(with = "humantime_serde")]
since: SystemTime,
},
Assigned(ConnectionState),
@@ -58,10 +57,11 @@ pub enum WithAddressState {
#[derive(Debug, Clone, Serialize)]
pub enum WithStatusSeriesIdStateInner {
UnknownAddress {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
SearchPending {
//#[serde(with = "serde_Instant")]
#[serde(with = "humantime_serde")]
since: SystemTime,
},
WithAddress {
@@ -69,6 +69,7 @@ pub enum WithStatusSeriesIdStateInner {
state: WithAddressState,
},
NoAddress {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
}
@@ -81,9 +82,11 @@ pub struct WithStatusSeriesIdState {
#[derive(Debug, Clone, Serialize)]
pub enum ActiveChannelState {
Init {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
WaitForStatusSeriesId {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
WithStatusSeriesId {

View File

@@ -3,6 +3,7 @@ use crate::ca::connset::CaConnSetEvent;
use crate::ca::connset::ChannelStatusesRequest;
use crate::ca::connset::ChannelStatusesResponse;
use crate::ca::connset::ConnSetCmd;
use crate::ca::statemap::ChannelState;
use crate::daemon_common::DaemonEvent;
use async_channel::Receiver;
use async_channel::Sender;
@@ -22,6 +23,7 @@ use stats::CaProtoStats;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddrV4;
use std::sync::atomic::AtomicU64;
@@ -134,11 +136,12 @@ async fn channel_state(
panic!("TODO");
}
// axum::Json<ChannelStatusesResponse>
// ChannelStatusesResponse
// BTreeMap<String, ChannelState>
async fn channel_states(
params: HashMap<String, String>,
tx: Sender<CaConnSetEvent>,
) -> axum::Json<ChannelStatusesResponse> {
) -> axum::Json<BTreeMap<String, ChannelState>> {
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
let limit = params
.get("limit")
@@ -149,17 +152,9 @@ async fn channel_states(
let req = ChannelStatusesRequest { name, limit, tx: tx2 };
let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req));
// TODO handle error
tx.send(item).await;
tx.send(item).await.unwrap();
let res = rx2.recv().await.unwrap();
// match serde_json::to_string(&res) {
// Ok(x) => x,
// Err(e) => {
// error!("Serialize error {e}");
// Err::<(), _>(e).unwrap();
// panic!();
// }
// }
axum::Json(res)
axum::Json(res.channels_ca_conn_set)
}
async fn extra_inserts_conf_set(v: ExtraInsertsConf, dcom: Arc<DaemonComm>) -> axum::Json<bool> {

View File

@@ -5,11 +5,12 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
futures-util = "0.3"
futures-util = "0.3.28"
async-channel = "1.9.0"
scylla = "0.9.0"
smallvec = "1.11"
smallvec = "1.11.0"
pin-project = "1.1.3"
stackfuture = "0.3.0"
log = { path = "../log" }
stats = { path = "../stats" }
series = { path = "../series" }

View File

@@ -17,6 +17,7 @@ use scylla::QueryResult;
use series::SeriesId;
use smallvec::smallvec;
use smallvec::SmallVec;
use stackfuture::StackFuture;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
@@ -289,12 +290,15 @@ where
InsertFut::new(scy, qu, params)
}
#[pin_project::pin_project]
pub struct InsertFut {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
qu: Arc<PreparedStatement>,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>,
// #[pin]
// fut: StackFuture<'static, Result<QueryResult, QueryError>, { 1024 * 3 }>,
}
impl InsertFut {
@@ -308,6 +312,7 @@ impl InsertFut {
let fut = scy_ref.execute_paged(qu_ref, params, None);
let fut = taskrun::tokio::task::unconstrained(fut);
let fut = Box::pin(fut);
// let fut = StackFuture::from(fut);
Self { scy, qu, fut }
}
}
@@ -315,8 +320,9 @@ impl InsertFut {
impl Future for InsertFut {
type Output = Result<QueryResult, QueryError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.fut.poll_unpin(cx)
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
this.fut.poll_unpin(cx)
}
}

View File

@@ -216,13 +216,13 @@ stats_proc::stats_struct!((
),
stats_struct(
name(CaConnSetStats),
prefix(connset),
counters(
channel_unknown_address,
channel_search_pending,
channel_no_address,
channel_with_address,
channel_unassigned,
channel_assigned,
channel_add,
channel_status_series_found,
channel_health_timeout,
ioc_search_start,
ioc_addr_found,
ca_conn_task_join_done_ok,
ca_conn_task_join_done_err,
ca_conn_task_join_err,
@@ -231,8 +231,6 @@ stats_proc::stats_struct!((
response_tx_fail,
try_push_ca_conn_cmds_full,
try_push_ca_conn_cmds_closed,
channel_wait_for_status_id,
channel_wait_for_address,
logic_error,
ready_for_end_of_stream,
ready_for_end_of_stream_with_progress,
@@ -241,7 +239,6 @@ stats_proc::stats_struct!((
poll_pending,
poll_reloop,
poll_no_progress_no_pending,
test_1,
),
values(
storage_insert_tx_len,
@@ -250,6 +247,13 @@ stats_proc::stats_struct!((
channel_info_res_tx_len,
find_ioc_query_sender_len,
ca_conn_res_tx_len,
channel_unknown_address,
channel_search_pending,
channel_no_address,
channel_unassigned,
channel_assigned,
channel_connected,
channel_rogue,
),
),
// agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),