State transitions

This commit is contained in:
Dominik Werder
2024-01-24 16:32:16 +01:00
parent 403f0b37a3
commit edbbb4f751
7 changed files with 535 additions and 512 deletions

View File

@@ -39,6 +39,7 @@ serieswriter = { path = "../serieswriter" }
stats = { path = "../stats" }
scywr = { path = "../scywr" }
dbpg = { path = "../dbpg" }
serde_helper = { path = "../serde_helper" }
ingest-linux = { path = "../ingest-linux" }
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }

View File

@@ -9,7 +9,8 @@ use async_channel::Receiver;
use async_channel::Sender;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -97,6 +98,30 @@ macro_rules! trace4 {
};
}
#[derive(Debug, ThisError)]
pub enum Error {
ConnectFail,
NoProto,
Protocol(#[from] crate::ca::proto::Error),
Writer(#[from] serieswriter::writer::Error),
UnknownCid(Cid),
NoNameForCid(Cid),
CreateChannelBadState,
CommonError(#[from] err::Error),
LoopInnerLogicError,
NoSender,
NotSending,
ClosedSending,
NoProgressNoPending,
ShutdownWithQueuesNoProgressNoPending,
}
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
err::Error::with_msg_no_trace(self.to_string())
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ChannelConnectedInfo {
Disconnected,
@@ -415,9 +440,9 @@ enum CaConnState {
Init,
Handshake,
PeerReady,
Wait(Pin<Box<dyn Future<Output = ()> + Send>>),
Shutdown,
EndOfStream,
Error,
}
impl fmt::Debug for CaConnState {
@@ -428,9 +453,9 @@ impl fmt::Debug for CaConnState {
Self::Init => write!(fmt, "Init"),
Self::Handshake => write!(fmt, "Handshake"),
Self::PeerReady => write!(fmt, "PeerReady"),
Self::Wait(_) => fmt.debug_tuple("Wait").finish(),
Self::Shutdown => write!(fmt, "Shutdown"),
Self::EndOfStream => write!(fmt, "EndOfStream"),
Self::Error => write!(fmt, "Error"),
}
}
}
@@ -600,10 +625,9 @@ pub enum CaConnEventValue {
EchoTimeout,
ConnCommandResult(ConnCommandResult),
ChannelStatus(ChannelStatusPartial),
QueryItem(QueryItem),
ChannelCreateFail(String),
Error(Error),
EndOfStream,
ConnectFail,
}
#[derive(Debug)]
@@ -787,7 +811,7 @@ impl CaConn {
debug!("emit status ConnectFail");
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnectFail,
value: CaConnEventValue::Error(Error::ConnectFail),
};
self.ca_conn_event_out_queue.push_back(item);
}
@@ -974,13 +998,9 @@ impl CaConn {
let jobid = res.0;
// by convention:
let cid = Cid(jobid.0 as _);
match res.1 {
Ok(wr) => {
self.handle_writer_establish_inner(cid, wr)?;
Ok(Ready(Some(())))
}
Err(e) => Err(Error::from_string(e.to_string())),
}
let wr = res.1?;
self.handle_writer_establish_inner(cid, wr)?;
Ok(Ready(Some(())))
}
Ready(None) => {
error!("writer_establish queue closed");
@@ -1010,41 +1030,57 @@ impl CaConn {
});
self.insert_item_queue.push_back(item);
}
let subid = {
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
trace!(
"new {:?} for {:?} chst {:?} {:?}",
subid,
cid,
st2.channel.cid,
st2.channel.sid
);
subid
};
{
trace!("send out EventAdd for {cid:?}");
let ty = CaMsgTy::EventAdd(EventAdd {
sid: st2.channel.sid.to_u32(),
data_type: st2.channel.ca_dbr_type,
data_count: st2.channel.ca_dbr_count,
subid: subid.to_u32(),
});
let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow);
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
}
let created_state = WritableState {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
writer,
reading: ReadingState::EnableMonitoring(EnableMonitoringState {
let name = self.name_by_cid.get(&st2.channel.cid).map(|x| x.as_str()).unwrap_or("");
if name.starts_with("TEST:PEAKING:") {
let created_state = WritableState {
tsbeg: self.poll_tsnow,
subid,
}),
};
*chst = ChannelState::Writable(created_state);
Ok(())
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
writer,
reading: ReadingState::Polling(PollingState {
tsbeg: self.poll_tsnow,
poll_ivl: Duration::from_millis(1000),
tick: PollTickState::Idle(self.poll_tsnow),
}),
};
*chst = ChannelState::Writable(created_state);
Ok(())
} else {
let subid = {
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
trace!(
"new {:?} for {:?} chst {:?} {:?}",
subid,
cid,
st2.channel.cid,
st2.channel.sid
);
subid
};
{
trace!("send out EventAdd for {cid:?}");
let ty = CaMsgTy::EventAdd(EventAdd {
sid: st2.channel.sid.to_u32(),
data_type: st2.channel.ca_dbr_type,
data_count: st2.channel.ca_dbr_count,
subid: subid.to_u32(),
});
let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow);
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
}
let created_state = WritableState {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
writer,
reading: ReadingState::EnableMonitoring(EnableMonitoringState {
tsbeg: self.poll_tsnow,
subid,
}),
};
*chst = ChannelState::Writable(created_state);
Ok(())
}
} else {
warn!("TODO handle_series_lookup_result channel in bad state, reset");
Ok(())
@@ -1249,9 +1285,8 @@ impl CaConn {
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
let e = Error::with_msg_no_trace("unknown {subid:?}");
error!("{e}");
return Err(e);
self.stats.unknown_subid().inc();
return Ok(());
};
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
x
@@ -1263,9 +1298,7 @@ impl CaConn {
// If we don't have it in the "closed" btree, then close connection to the IOC and count
// as logic error.
// Close connection to the IOC. Cout as logic error.
let e = Error::with_msg_no_trace(format!(
"TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"
));
let e = Error::UnknownCid(cid);
error!("{e}");
return Err(e);
};
@@ -1317,7 +1350,7 @@ impl CaConn {
ChannelState::Writable(st) => {
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 10;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 4;
if crst.stwin_ts != stwin_ts {
crst.stwin_ts = stwin_ts;
crst.stwin_count = 0;
@@ -1325,15 +1358,24 @@ impl CaConn {
{
crst.stwin_count += 1;
crst.stwin_bytes += ev.payload_len;
if crst.stwin_count > 5 || crst.stwin_bytes > 1024 * 1024 * 1 {
if crst.stwin_count > 30000 || crst.stwin_bytes > 1024 * 1024 * 500 {
let subid = match &mut st.reading {
ReadingState::EnableMonitoring(x) => Some(x.subid.clone()),
ReadingState::Monitoring(x) => Some(x.subid.clone()),
ReadingState::StopMonitoringForPolling(_) => None,
ReadingState::Polling(_) => None,
ReadingState::StopMonitoringForPolling(_) => {
self.stats.transition_to_polling_bad_state().inc();
None
}
ReadingState::Polling(_) => {
self.stats.transition_to_polling_already_in().inc();
None
}
};
if let Some(subid) = subid {
self.stats.transition_to_polling().inc();
self.transition_to_polling(subid, tsnow)?;
} else {
self.stats.transition_to_polling_bad_state().inc();
}
return Ok(());
}
@@ -1413,7 +1455,7 @@ impl CaConn {
ReadingState::StopMonitoringForPolling(..) => {
st.reading = ReadingState::Polling(PollingState {
tsbeg: tsnow,
poll_ivl: Duration::from_millis(2000),
poll_ivl: Duration::from_millis(1000),
tick: PollTickState::Idle(tsnow),
});
}
@@ -1511,7 +1553,8 @@ impl CaConn {
}
}
} else {
warn!("unknown {ioid:?}");
// warn!("unknown {ioid:?}");
self.stats.unknown_ioid().inc();
}
Ok(())
}
@@ -1552,35 +1595,36 @@ impl CaConn {
Self::check_ev_value_data(&value.data, writer.scalar_type())?;
{
let val: DataValue = value.data.into();
writer
.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)
.map_err(|e| Error::from_string(e))?;
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)?;
}
Ok(())
} else {
stats.channel_fast_item_drop.inc();
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let item = IvlItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Ivl(item));
// TODO
if false {
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let item = IvlItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Ivl(item));
}
if false && crst.muted_before == 0 {
let ema = crst.insert_item_ivl_ema.ema();
let item = MuteItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Mute(item));
}
crst.muted_before = 1;
}
if false && crst.muted_before == 0 {
let ema = crst.insert_item_ivl_ema.ema();
let item = MuteItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Mute(item));
}
crst.muted_before = 1;
Ok(())
}
}
@@ -1637,7 +1681,7 @@ impl CaConn {
}
CaItem::Msg(msg) => match msg.ty {
CaMsgTy::VersionRes(n) => {
debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg);
// debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg);
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
Ready(Some(Ok(())))
@@ -1657,13 +1701,13 @@ impl CaConn {
},
Err(e) => {
error!("got error item from CaProto {e:?}");
Ready(Some(Err(e.to_string().into())))
Ready(Some(Err(e.into())))
}
},
Ready(None) => {
warn!("handle_conn_listen CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
self.proto = None;
self.state = CaConnState::EndOfStream;
Ready(None)
}
Pending => Pending,
@@ -1681,9 +1725,7 @@ impl CaConn {
match self.channels.get(&cid).unwrap() {
ChannelState::Init(cssid) => {
let cssid = cssid.clone();
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::with_msg_no_trace("name for cid not known"));
let name = self.name_by_cid(cid).ok_or_else(|| Error::UnknownCid(cid));
let name = match name {
Ok(k) => k.to_string(),
Err(e) => return Err(e),
@@ -1745,6 +1787,7 @@ impl CaConn {
do_wake_again = true;
self.proto.as_mut().unwrap().push_out(msg);
st3.tick = PollTickState::Wait(tsnow, ioid);
self.stats.caget_issued().inc();
}
}
PollTickState::Wait(x, ioid) => {
@@ -1753,6 +1796,7 @@ impl CaConn {
self.stats.caget_timeout().inc();
// warn!("channel caget timeout");
// std::process::exit(1);
st3.tick = PollTickState::Idle(tsnow);
}
}
},
@@ -1783,7 +1827,7 @@ impl CaConn {
let proto = if let Some(x) = self.proto.as_mut() {
x
} else {
return Ready(Some(Err(Error::with_msg_no_trace("handle_peer_ready but no proto"))));
return Ready(Some(Err(Error::NoProto)));
};
let res = match proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
@@ -1897,7 +1941,7 @@ impl CaConn {
}
Pending => Pending,
};
res.map_err(|e| Error::from(e.to_string()))
res.map_err(Into::into)
}
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
@@ -1909,7 +1953,7 @@ impl CaConn {
let name = if let Some(x) = name_by_cid.get(&cid) {
x.to_string()
} else {
return Err(Error::with_msg_no_trace(format!("no name for {cid:?}")));
return Err(Error::NoNameForCid(cid));
};
trace!("handle_create_chan_res {k:?} {name:?}");
// TODO handle not-found error:
@@ -1919,7 +1963,7 @@ impl CaConn {
_ => {
// TODO handle in better way:
// Remove channel and emit notice that channel is removed with reason.
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
let e = Error::CreateChannelBadState;
return Err(e);
}
};
@@ -1974,8 +2018,8 @@ impl CaConn {
fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow<Poll<Result<(), Error>>> {
use ControlFlow::*;
use Poll::*;
let e = Error::with_msg_no_trace(format!("test"));
//Err(e)?;
let e = Error::CreateChannelBadState;
// Err(e)?;
let _ = e;
Break(Pending)
}
@@ -2018,43 +2062,28 @@ impl CaConn {
Ok(Ready(Some(())))
}
Ok(Err(e)) => {
trace!("error connect to {addr} {e}");
if true {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
},
));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
} else {
// TODO log with exponential backoff
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
}
debug!("error connect to {addr} {e}");
let addr = addr.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
}));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
Ok(Ready(Some(())))
}
Err(e) => {
// TODO log with exponential backoff
trace!("timeout connect to {addr} {e}");
if true {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
},
));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
} else {
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
}
debug!("timeout connect to {addr} {e}");
let addr = addr.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
}));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
Ok(Ready(Some(())))
}
}
@@ -2096,25 +2125,9 @@ impl CaConn {
Pending => Ok(Pending),
}
}
CaConnState::Wait(inst) => {
trace4!("Wait");
match inst.poll_unpin(cx) {
Ready(_) => {
self.state = CaConnState::Unconnected(Instant::now());
self.proto = None;
Ok(Ready(Some(())))
}
Pending => Ok(Pending),
}
}
CaConnState::Shutdown => {
trace4!("Shutdown");
Ok(Ready(None))
}
CaConnState::EndOfStream => {
trace4!("EndOfStream");
Ok(Ready(None))
}
CaConnState::Shutdown => Ok(Ready(None)),
CaConnState::EndOfStream => Ok(Ready(None)),
CaConnState::Error => Ok(Ready(None)),
}
}
@@ -2137,7 +2150,7 @@ impl CaConn {
}
Ready(None) => {
error!("handle_conn_state yields {x:?}");
return Err(Error::with_msg_no_trace("logic error"));
return Err(Error::LoopInnerLogicError);
}
Pending => return Ok(Pending),
},
@@ -2196,9 +2209,9 @@ impl CaConn {
CaConnState::Init => {}
CaConnState::Handshake => {}
CaConnState::PeerReady => {}
CaConnState::Wait(_) => {}
CaConnState::Shutdown => {}
CaConnState::EndOfStream => {}
CaConnState::Error => {}
}
Ok(())
}
@@ -2231,9 +2244,7 @@ impl CaConn {
fn tick_writers(&mut self) -> Result<(), Error> {
for (k, st) in &mut self.channels {
if let ChannelState::Writable(st2) = st {
st2.writer
.tick(&mut self.insert_item_queue)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
st2.writer.tick(&mut self.insert_item_queue)?;
}
}
Ok(())
@@ -2244,13 +2255,15 @@ impl CaConn {
}
fn queues_out_flushed(&self) -> bool {
self.queues_async_out_flushed() && self.ca_conn_event_out_queue.is_empty()
}
fn queues_async_out_flushed(&self) -> bool {
// self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle()
// TODO re-enable later
self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle()
debug!(
"async out flushed iiq {} {} caout {}",
self.insert_item_queue.is_empty(),
self.storage_insert_sender.is_idle(),
self.ca_conn_event_out_queue.is_empty()
);
self.insert_item_queue.is_empty()
&& self.storage_insert_sender.is_idle()
&& self.ca_conn_event_out_queue.is_empty()
}
fn attempt_flush_queue<T, Q, FB, FS>(
@@ -2276,7 +2289,7 @@ impl CaConn {
break;
}
if !sp.has_sender() {
return Err(Error::with_msg_no_trace(format!("flush queue {id} no sender")));
return Err(Error::NoSender);
}
if sp.is_idle() {
if let Some(item) = qu_to_si(qu) {
@@ -2292,16 +2305,18 @@ impl CaConn {
have_progress = true;
}
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("flush queue {id} {e}"));
return Err(e);
use crate::senderpolling::Error as SpErr;
match e {
SpErr::NoSendInProgress => return Err(Error::NotSending),
SpErr::Closed(_) => return Err(Error::ClosedSending),
}
}
Pending => {
return Ok(Pending);
}
}
} else {
let e = Error::with_msg_no_trace(format!("flush queue {id} not sending"));
return Err(e);
return Err(Error::NotSending);
}
}
if have_progress {
@@ -2390,22 +2405,11 @@ impl Stream for CaConn {
}
{
// TODO use stats histogram type to test the native prometheus histogram feature
let qu = &self.insert_item_queue;
let stats = &self.stats;
let n = qu.len();
if n >= 128 {
stats.storage_queue_above_128().inc();
} else if n >= 32 {
stats.storage_queue_above_32().inc();
} else if n >= 8 {
stats.storage_queue_above_8().inc();
}
let iiq = &self.insert_item_queue;
self.stats.iiq_len().ingest(iiq.len() as u32);
}
let lts3;
if !self.is_shutdown() {
{
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
stats2.iiq_batch_len().ingest(item.len() as u32);
@@ -2421,8 +2425,11 @@ impl Stream for CaConn {
cx,
stats_fn
);
lts3 = Instant::now();
}
let lts3 = Instant::now();
if !self.is_shutdown() {
flush_queue!(
self,
writer_establish_qu,
@@ -2434,8 +2441,6 @@ impl Stream for CaConn {
cx,
|_| {}
);
} else {
lts3 = Instant::now();
}
let lts4 = Instant::now();
@@ -2512,23 +2517,24 @@ impl Stream for CaConn {
break if self.is_shutdown() {
if self.queues_out_flushed() {
// debug!("end of stream {}", self.remote_addr_dbg);
debug!("is_shutdown queues_out_flushed set EOS {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
} else {
// debug!("queues_out_flushed false");
if have_progress {
debug!("is_shutdown NOT queues_out_flushed prog {}", self.remote_addr_dbg);
self.stats.poll_reloop().inc();
reloops += 1;
continue;
} else if have_pending {
debug!("is_shutdown NOT queues_out_flushed pend {}", self.remote_addr_dbg);
self.stats.poll_pending().inc();
Pending
} else {
// TODO error
error!("shutting down, queues not flushed, no progress, no pending");
self.stats.logic_error().inc();
let e = Error::with_msg_no_trace("shutting down, queues not flushed, no progress, no pending");
let e = Error::ShutdownWithQueuesNoProgressNoPending;
Ready(Some(Err(e)))
}
}
@@ -2551,7 +2557,7 @@ impl Stream for CaConn {
Pending
} else {
self.stats.poll_no_progress_no_pending().inc();
let e = Error::with_msg_no_trace("no progress no pending");
let e = Error::NoProgressNoPending;
Ready(Some(Err(e)))
}
};

View File

@@ -11,7 +11,6 @@ use crate::senderpolling::SenderPolling;
use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
use async_channel::Sender;
use atomic::AtomicUsize;
use conn::CaConn;
use conn::CaConnEvent;
use conn::CaConnEventValue;
@@ -85,6 +84,7 @@ const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000);
const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000);
const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0);
const CHANNEL_BACKOFF: Duration = Duration::from_millis(10000);
const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000;
#[allow(unused)]
@@ -550,68 +550,15 @@ impl CaConnSet {
CaConnEventValue::None => Ok(()),
CaConnEventValue::EchoTimeout => Ok(()),
CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x),
CaConnEventValue::QueryItem(item) => {
error!("TODO remove this insert case");
// self.storage_insert_queue.push_back(item);
Ok(())
}
CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x),
CaConnEventValue::ChannelStatus(st) => self.apply_ca_conn_health_update(addr, st),
CaConnEventValue::Error(e) => self.handle_ca_conn_err(e, addr),
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
CaConnEventValue::ConnectFail => self.handle_connect_fail(addr),
CaConnEventValue::ChannelStatus(st) => {
self.apply_ca_conn_health_update(addr, st)?;
// let sst = &mut self.channel_states;
// for (k, v) in st.channel_statuses {
// if let Some(ch) = self.channel_by_cssid.get(&k) {
// // Only when the channel is active we expect to receive status updates.
// if let Some(st) = sst.get_mut(ch) {
// if let ChannelStateValue::Active(st2) = &mut st.value {
// if let ActiveChannelState::WithStatusSeriesId {
// status_series_id,
// state: st3,
// } = st2
// {
// if let WithStatusSeriesIdStateInner::WithAddress { addr, state: st4 } =
// &mut st3.inner
// {
// if let WithAddressState::Assigned(st5) = st4 {
// } else {
// }
// } else {
// }
// } else {
// }
// } else {
// }
// st.value = ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
// status_series_id: (),
// state: WithStatusSeriesIdState {
// addr_find_backoff: todo!(),
// inner: todo!(),
// },
// });
// } else {
// // TODO this should be an error.
// }
// match v.channel_connected_info {
// conn::ChannelConnectedInfo::Disconnected => {}
// conn::ChannelConnectedInfo::Connecting => todo!(),
// conn::ChannelConnectedInfo::Connected => todo!(),
// conn::ChannelConnectedInfo::Error => todo!(),
// conn::ChannelConnectedInfo::Ended => todo!(),
// }
// } else {
// warn!("we do not know {:?}", k);
// }
// }
Ok(())
}
}
}
fn handle_series_lookup_result(&mut self, res: Result<ChannelInfoResult, Error>) -> Result<(), Error> {
debug!("handle_series_lookup_result {res:?}");
if self.shutdown_stopping {
return Ok(());
}
@@ -649,15 +596,13 @@ impl CaConnSet {
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 {
*chst2 = ActiveChannelState::WithStatusSeriesId {
status_series_id: cmd.cssid,
state: WithStatusSeriesIdState {
addr_find_backoff: 0,
inner: WithStatusSeriesIdStateInner::AddrSearchPending {
since: SystemTime::now(),
},
*chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState {
cssid: cmd.cssid,
addr_find_backoff: 0,
inner: WithStatusSeriesIdStateInner::AddrSearchPending {
since: SystemTime::now(),
},
};
});
let qu = IocAddrQuery::cached(cmd.name);
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
@@ -692,14 +637,12 @@ impl CaConnSet {
let ch = Channel::new(cmd.name.clone());
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} = ast
{
if let ActiveChannelState::WithStatusSeriesId(st3) = ast {
debug!("handle_add_channel_with_addr INNER {cmd:?}");
self.stats.handle_add_channel_with_addr().inc();
let tsnow = SystemTime::now();
*st3 = WithStatusSeriesIdState {
cssid: cmd.cssid.clone(),
addr_find_backoff: 0,
inner: WithStatusSeriesIdStateInner::WithAddress {
addr: addr_v4,
@@ -710,11 +653,15 @@ impl CaConnSet {
}),
},
};
if !self.ca_conn_ress.contains_key(&cmd.addr) {
let addr = cmd.addr;
if self.ca_conn_ress.contains_key(&addr) {
debug!("ca_conn_ress has already {addr:?}");
} else {
debug!("ca_conn_ress NEW {addr:?}");
let c = self.create_ca_conn(cmd.clone())?;
self.ca_conn_ress.insert(cmd.addr, c);
self.ca_conn_ress.insert(addr, c);
}
let conn_ress = self.ca_conn_ress.get_mut(&cmd.addr).unwrap();
let conn_ress = self.ca_conn_ress.get_mut(&addr).unwrap();
let cmd = ConnCommand::channel_add(cmd.name, cmd.cssid);
conn_ress.cmd_queue.push_back(cmd);
}
@@ -737,10 +684,7 @@ impl CaConnSet {
ActiveChannelState::WaitForStatusSeriesId { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state,
} => match &state.inner {
ActiveChannelState::WithStatusSeriesId(state) => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
@@ -767,10 +711,10 @@ impl CaConnSet {
}
fn handle_ioc_query_result(&mut self, results: VecDeque<FindIocRes>) -> Result<(), Error> {
debug!("handle_ioc_query_result {results:?}");
if self.shutdown_stopping {
return Ok(());
}
trace3!("handle_ioc_query_result");
for res in results {
let ch = Channel::new(res.channel.clone());
if trigger.contains(&ch.id()) {
@@ -778,34 +722,31 @@ impl CaConnSet {
}
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} = ast
{
if let ActiveChannelState::WithStatusSeriesId(st2) = ast {
if let Some(addr) = res.addr {
self.stats.ioc_addr_found().inc();
trace3!("ioc found {res:?}");
let since = SystemTime::now();
state.addr_find_backoff = 0;
state.inner = WithStatusSeriesIdStateInner::WithAddress {
addr,
state: WithAddressState::Unassigned { since },
};
debug!("ioc found {res:?}");
if false {
let since = SystemTime::now();
st2.addr_find_backoff = 0;
st2.inner = WithStatusSeriesIdStateInner::WithAddress {
addr,
state: WithAddressState::Unassigned { since },
};
} else {
let cmd = ChannelAddWithAddr {
backend: self.backend.clone(),
name: res.channel,
addr: SocketAddr::V4(addr),
cssid: status_series_id.clone(),
cssid: st2.cssid.clone(),
};
self.handle_add_channel_with_addr(cmd)?;
}
} else {
self.stats.ioc_addr_not_found().inc();
trace3!("ioc not found {res:?}");
debug!("ioc not found {res:?}");
let since = SystemTime::now();
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since };
st2.inner = WithStatusSeriesIdStateInner::UnknownAddress { since };
}
} else {
self.stats.ioc_addr_result_for_unknown_channel().inc();
@@ -824,6 +765,8 @@ impl CaConnSet {
}
fn handle_check_health(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
let stnow = SystemTime::now();
trace2!("handle_check_health");
if self.shutdown_stopping {
Ok(())
@@ -832,7 +775,7 @@ impl CaConnSet {
self.thr_msg_storage_len
.trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]);
}
self.check_channel_states()?;
self.check_channel_states(tsnow, stnow)?;
let item = CaConnSetItem::Healthy;
self.connset_out_queue.push_back(item);
Ok(())
@@ -877,7 +820,7 @@ impl CaConnSet {
Ok(())
}
fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> {
fn handle_conn_command_result(&mut self, _addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> {
use crate::ca::conn::ConnCommandResultKind::*;
match res.kind {
Unused => Ok(()),
@@ -897,11 +840,7 @@ impl CaConnSet {
};
if let Some(st1) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} = st2
{
if let ActiveChannelState::WithStatusSeriesId(st3) = st2 {
if let WithStatusSeriesIdStateInner::WithAddress {
addr: conn_addr,
state: st4,
@@ -940,11 +879,7 @@ impl CaConnSet {
let ch = Channel::new(name);
if let Some(st1) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} = st2
{
if let ActiveChannelState::WithStatusSeriesId(st3) = st2 {
trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress");
st3.addr_find_backoff += 1;
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow };
@@ -955,7 +890,7 @@ impl CaConnSet {
}
fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
trace2!("handle_ca_conn_eos {addr}");
debug!("handle_ca_conn_eos {addr}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
self.stats.ca_conn_eos_ok().inc();
self.await_ca_conn_jhs.push_back((addr, e.jh));
@@ -963,12 +898,31 @@ impl CaConnSet {
self.stats.ca_conn_eos_unexpected().inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
}
self.remove_status_for_addr(addr)?;
// self.remove_channel_status_for_addr(addr)?;
trace2!("still CaConn left {}", self.ca_conn_ress.len());
Ok(())
}
fn handle_ca_conn_err(&mut self, e: super::conn::Error, addr: SocketAddr) -> Result<(), Error> {
use super::conn::Error as E2;
error!("received error {addr} {e}");
match e {
E2::ConnectFail => self.handle_connect_fail(addr)?,
_ => {
// TODO others?
}
}
Ok(())
}
fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> {
// TODO ideally should only remove on EOS.
self.ca_conn_ress.remove(&addr);
self.transition_channels_to_maybe_wrong_address(addr)?;
Ok(())
}
fn transition_channels_to_maybe_wrong_address(&mut self, addr: SocketAddr) -> Result<(), Error> {
trace2!("handle_connect_fail {addr}");
let tsnow = SystemTime::now();
for (ch, st1) in self.channel_states.iter_mut() {
@@ -976,10 +930,7 @@ impl CaConnSet {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { since: _ } => {}
ActiveChannelState::WaitForStatusSeriesId { since: _ } => {}
ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} => {
ActiveChannelState::WithStatusSeriesId(st3) => {
if let WithStatusSeriesIdStateInner::WithAddress {
addr: addr_ch,
state: _st4,
@@ -1005,7 +956,28 @@ impl CaConnSet {
Ok(())
}
fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> {
fn remove_channel_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> {
debug!("TODO remove_channel_status_for_addr");
if true {
let e = Error::with_msg_no_trace("TODO remove_channel_status_for_addr");
return Err(e);
}
for (_, v) in self.channel_states.iter_mut() {
match &mut v.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner {
WithStatusSeriesIdStateInner::WithAddress { addr: a2, state: st4 } => {
if SocketAddr::V4(*a2) == addr {
*st4 = WithAddressState::Backoff(Instant::now());
}
}
_ => {}
},
_ => {}
},
ChannelStateValue::ToRemove { .. } => {}
}
}
Ok(())
}
@@ -1062,60 +1034,73 @@ impl CaConnSet {
async fn ca_conn_item_merge(
conn: CaConn,
tx1: Sender<(SocketAddr, CaConnEvent)>,
tx2: Sender<VecDeque<QueryItem>>,
_tx2: Sender<VecDeque<QueryItem>>,
addr: SocketAddr,
stats: Arc<CaConnSetStats>,
) -> Result<(), Error> {
stats.ca_conn_task_begin().inc();
trace2!("ca_conn_consumer begin {}", addr);
let connstats = conn.stats();
let mut conn = Box::pin(conn);
let mut ret = Ok(());
let ret = Self::ca_conn_item_merge_inner(Box::pin(conn), tx1.clone(), addr, connstats).await;
trace2!("ca_conn_consumer ended {}", addr);
match ret {
Ok(()) => {
debug!("Sending CaConnEventValue::EndOfStream");
tx1.send((
addr,
CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::EndOfStream,
},
))
.await?;
}
Err(e) => {
error!("ca_conn_item_merge received from inner: {e}");
}
}
stats.ca_conn_task_done().inc();
Ok(())
}
async fn ca_conn_item_merge_inner(
mut conn: Pin<Box<CaConn>>,
tx1: Sender<(SocketAddr, CaConnEvent)>,
addr: SocketAddr,
stats: Arc<CaConnStats>,
) -> Result<(), Error> {
while let Some(item) = conn.next().await {
match item {
Ok(item) => {
connstats.item_count.inc();
stats.item_count.inc();
match item.value {
CaConnEventValue::QueryItem(x) => {
warn!("ca_conn_item_merge should not go here often");
let mut v = VecDeque::new();
v.push_back(x);
if let Err(_) = tx2.send(v).await {
break;
}
}
CaConnEventValue::None
| CaConnEventValue::EchoTimeout
| CaConnEventValue::ConnCommandResult(..)
| CaConnEventValue::ChannelCreateFail(..)
| CaConnEventValue::EndOfStream
| CaConnEventValue::ConnectFail
| CaConnEventValue::ChannelStatus(..) => {
if let Err(_) = tx1.send((addr, item)).await {
break;
| CaConnEventValue::ChannelStatus(..)
| CaConnEventValue::Error(..) => {
if let Err(e) = tx1.send((addr, item)).await {
error!("can not deliver error {e}");
return Err(Error::with_msg_no_trace("can not deliver error"));
}
}
CaConnEventValue::EndOfStream => break,
}
}
Err(e) => {
error!("CaConn gives error: {e:?}");
ret = Err(e);
break;
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::Error(e),
};
if let Err(e) = tx1.send((addr, item)).await {
error!("can not deliver error {e}");
return Err(Error::with_msg_no_trace("can not deliver error"));
}
}
}
}
trace2!("ca_conn_consumer ended {}", addr);
tx1.send((
addr,
CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::EndOfStream,
},
))
.await?;
trace2!("ca_conn_consumer signaled {}", addr);
stats.ca_conn_task_done().inc();
ret
Ok(())
}
fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> {
@@ -1264,7 +1249,7 @@ impl CaConnSet {
Ok(())
}
fn check_channel_states(&mut self) -> Result<(), Error> {
fn check_channel_states(&mut self, tsnow: Instant, stnow: SystemTime) -> Result<(), Error> {
let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts();
let mut cmd_remove_channel = Vec::new();
let mut cmd_add_channel = Vec::new();
@@ -1276,8 +1261,6 @@ impl CaConnSet {
} else {
self.channel_states.range_mut(..)
};
let tsnow = SystemTime::now();
for (i, (ch, st)) in it.enumerate() {
match &mut st.value {
ChannelStateValue::Active(st2) => match st2 {
@@ -1286,51 +1269,61 @@ impl CaConnSet {
self.stats.logic_error().inc();
}
ActiveChannelState::WaitForStatusSeriesId { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > Duration::from_millis(5000) {
warn!("timeout can not get status series id for {ch:?}");
*st2 = ActiveChannelState::Init { since: tsnow };
*st2 = ActiveChannelState::Init { since: stnow };
} else {
// TODO
}
}
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} => match &mut state.inner {
ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner {
WithStatusSeriesIdStateInner::UnknownAddress { since } => {
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < tsnow {
if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < stnow {
if false {
// TODO
error!("TODO trigger address search from state UnknownAddress");
if true {
std::process::exit(1);
}
if false {
// TODO
search_pending_count += 1;
st3.inner =
WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
}
} else {
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow };
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
let qu = IocAddrQuery::uncached(ch.id().into());
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
}
}
}
}
WithStatusSeriesIdStateInner::AddrSearchPending { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
debug!("TODO should receive some error indication instead of timeout for {ch:?}");
state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow };
st3.inner = WithStatusSeriesIdStateInner::NoAddress { since: stnow };
search_pending_count -= 1;
}
}
WithStatusSeriesIdStateInner::WithAddress {
addr: addr_v4,
state: st3,
state: st4,
} => {
use WithAddressState::*;
match st3 {
match st4 {
Unassigned { since } => {
if assigned_without_health_update < CHANNEL_MAX_WITHOUT_HEALTH_UPDATE as _ {
if *since + CHANNEL_UNASSIGNED_TIMEOUT < tsnow {
if *since + CHANNEL_UNASSIGNED_TIMEOUT < stnow {
assigned_without_health_update += 1;
let cmd = ChannelAddWithAddr {
backend: self.backend.clone(),
name: ch.id().into(),
cssid: status_series_id.clone(),
cssid: st3.cssid.clone(),
addr: SocketAddr::V4(*addr_v4),
};
cmd_add_channel.push(cmd);
@@ -1338,47 +1331,55 @@ impl CaConnSet {
}
}
Assigned(st4) => {
if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < tsnow {
warn!("soon health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < stnow {
self.stats.channel_health_timeout_soon().inc();
}
if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow {
if st4.updated + CHANNEL_HEALTH_TIMEOUT < stnow {
self.stats.channel_health_timeout().inc();
trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
// TODO
error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
std::process::exit(1);
if true {
std::process::exit(1);
}
let addr = SocketAddr::V4(*addr_v4);
cmd_remove_channel.push((addr, ch.clone()));
if st.health_timeout_count < 3 {
state.addr_find_backoff += 1;
state.inner =
WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow };
let item = ChannelStatusItem::new_closed_conn_timeout(
tsnow,
status_series_id.clone(),
);
st3.addr_find_backoff += 1;
st3.inner =
WithStatusSeriesIdStateInner::MaybeWrongAddress { since: stnow };
let item =
ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone());
channel_status_items.push(item);
}
}
}
Backoff(ts) => {
if tsnow.saturating_duration_since(*ts) >= CHANNEL_BACKOFF {
*st4 = Unassigned { since: stnow };
}
}
}
}
WithStatusSeriesIdStateInner::NoAddress { since } => {
if *since + NO_ADDRESS_STAY < tsnow {
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow };
if *since + NO_ADDRESS_STAY < stnow {
st3.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: stnow };
}
}
WithStatusSeriesIdStateInner::MaybeWrongAddress { since } => {
if *since + (MAYBE_WRONG_ADDRESS_STAY * state.addr_find_backoff.min(10).max(1)) < tsnow {
if *since + (MAYBE_WRONG_ADDRESS_STAY * st3.addr_find_backoff.max(1).min(10)) < stnow {
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
debug!("try again channel after MaybeWrongAddress");
if trigger.contains(&ch.id()) {
debug!("issue ioc search for {}", ch.id());
}
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow };
st3.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: stnow };
let qu = IocAddrQuery::uncached(ch.id().into());
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
} else {
debug!("try again channel after MaybeWrongAddress NOT YET");
}
}
}
@@ -1416,6 +1417,7 @@ impl CaConnSet {
let mut search_pending = 0;
let mut no_address = 0;
let mut unassigned = 0;
let mut backoff = 0;
let mut assigned = 0;
let mut connected = 0;
let mut maybe_wrong_address = 0;
@@ -1429,7 +1431,7 @@ impl CaConnSet {
ActiveChannelState::WaitForStatusSeriesId { .. } => {
unknown_address += 1;
}
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
ActiveChannelState::WithStatusSeriesId(st3) => match &st3.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
unknown_address += 1;
}
@@ -1453,6 +1455,9 @@ impl CaConnSet {
}
}
}
WithAddressState::Backoff(ts) => {
backoff += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
no_address += 1;
@@ -1471,6 +1476,7 @@ impl CaConnSet {
self.stats.channel_search_pending.set(search_pending);
self.stats.channel_no_address.set(no_address);
self.stats.channel_unassigned.set(unassigned);
self.stats.channel_backoff.set(backoff);
self.stats.channel_assigned.set(assigned);
self.stats.channel_connected.set(connected);
self.stats.channel_maybe_wrong_address.set(maybe_wrong_address);
@@ -1634,6 +1640,7 @@ impl Stream for CaConnSet {
if self.find_ioc_query_sender.is_idle() {
if let Some(item) = self.find_ioc_query_queue.pop_front() {
debug!("push find item {item:?}");
self.find_ioc_query_sender.as_mut().send_pin(item);
}
}
@@ -1682,7 +1689,9 @@ impl Stream for CaConnSet {
}
Err(e) => break Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => {}
Ready(None) => {
// TODO trigger shutdown because of error
}
Pending => {
have_pending = true;
}

View File

@@ -6,12 +6,9 @@ use log::*;
use netpod::timeunits::*;
use slidebuf::SlideBuf;
use stats::CaProtoStats;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io;
use std::net::SocketAddrV4;
use std::num::NonZeroU16;
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
@@ -47,6 +44,7 @@ pub enum Error {
ExtendedHeaderBadCount,
NoReadBufferSpace,
NeitherPendingNorProgress,
OutputBufferTooSmall,
}
const CA_PROTO_VERSION: u16 = 13;
@@ -421,24 +419,24 @@ impl CaMsgTy {
Search(_) => CA_PROTO_VERSION,
SearchRes(_) => 0,
CreateChan(_) => 0,
CreateChanRes(x) => {
CreateChanRes(..) => {
panic!();
x.data_count as _
// x.data_count as _
}
CreateChanFail(_) => 0,
AccessRightsRes(_) => 0,
EventAdd(x) => x.data_count,
EventAddRes(x) => {
EventAddRes(..) => {
panic!();
x.data_count as _
// x.data_count as _
}
EventAddResEmpty(_) => 0,
EventCancel(x) => x.data_count,
EventCancelRes(x) => 0,
EventCancelRes(..) => 0,
ReadNotify(x) => x.data_count,
ReadNotifyRes(x) => {
ReadNotifyRes(..) => {
panic!();
x.data_count as _
// x.data_count as _
}
Echo => 0,
}
@@ -963,7 +961,6 @@ pub struct CaProto {
outbuf: SlideBuf,
out: VecDeque<CaMsg>,
array_truncate: usize,
logged_proto_error_for_cid: HashMap<u32, bool>,
stats: Arc<CaProtoStats>,
resqu: VecDeque<CaItem>,
}
@@ -978,7 +975,6 @@ impl CaProto {
outbuf: SlideBuf::new(1024 * 128),
out: VecDeque::new(),
array_truncate,
logged_proto_error_for_cid: HashMap::new(),
stats,
resqu: VecDeque::with_capacity(256),
}
@@ -1023,7 +1019,10 @@ impl CaProto {
match w.poll_write(cx, b) {
Ready(k) => match k {
Ok(k) => match self.outbuf.adv(k) {
Ok(()) => Ready(Ok(k)),
Ok(()) => {
self.stats.out_bytes().add(k as u64);
Ready(Ok(k))
}
Err(e) => {
error!("advance error {:?}", e);
Ready(Err(e.into()))
@@ -1043,16 +1042,22 @@ impl CaProto {
let mut have_pending = false;
let mut have_progress = false;
let tsnow = Instant::now();
{
let g = self.outbuf.len();
self.stats.outbuf_len().ingest(g as u32);
}
'l1: while self.out.len() != 0 {
while let Some((msg, buf)) = self.out_msg_buf() {
let msglen = msg.len();
if msglen > buf.len() {
error!("got output buffer but too small");
break;
let e = Error::OutputBufferTooSmall;
return Err(e);
} else {
msg.place_into(&mut buf[..msglen]);
self.outbuf.wadv(msglen)?;
self.out.pop_front();
self.stats.out_msg_placed().inc();
}
}
while self.outbuf.len() != 0 {
@@ -1105,14 +1110,15 @@ impl CaProto {
Ok(()) => {
let nf = rbuf.filled().len();
if nf == 0 {
info!(
"EOF peer {:?} {:?} {:?}",
debug!(
"peer done {:?} {:?} {:?}",
self.tcp.peer_addr(),
self.remote_addr_dbg,
self.state
);
// TODO may need another state, if not yet done when input is EOF.
self.state = CaState::Done;
have_progress = true;
} else {
if false {
info!("received {} bytes", rbuf.filled().len());
@@ -1166,7 +1172,6 @@ impl CaProto {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 1 || hi.cmdid == 15 {
let sid = hi.param1;
if hi.payload_size == 0xffff {
if hi.data_count != 0 {
warn!("protocol error: {hi:?}");
@@ -1226,7 +1231,7 @@ impl CaProto {
let g = self.buf.read_bytes(hi.payload_len())?;
let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?;
// data-count is only reasonable for event messages
if let CaMsgTy::EventAddRes(e) = &msg.ty {
if let CaMsgTy::EventAddRes(..) = &msg.ty {
self.stats.data_count().ingest(hi.data_count() as u32);
}
self.state = CaState::StdHead;

View File

@@ -55,6 +55,7 @@ pub enum WithAddressState {
since: SystemTime,
},
Assigned(ConnectionState),
Backoff(#[serde(with = "serde_helper::serde_Instant")] Instant),
}
#[derive(Debug, Clone, Serialize)]
@@ -83,6 +84,7 @@ pub enum WithStatusSeriesIdStateInner {
#[derive(Debug, Clone, Serialize)]
pub struct WithStatusSeriesIdState {
pub cssid: ChannelStatusSeriesId,
pub addr_find_backoff: u32,
pub inner: WithStatusSeriesIdStateInner,
}
@@ -97,10 +99,7 @@ pub enum ActiveChannelState {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
WithStatusSeriesId {
status_series_id: ChannelStatusSeriesId,
state: WithStatusSeriesIdState,
},
WithStatusSeriesId(WithStatusSeriesIdState),
}
#[derive(Debug, Clone, Serialize)]

View File

@@ -1,5 +1,5 @@
#[allow(non_snake_case)]
mod serde_Instant {
pub mod serde_Instant {
use serde::Serializer;
use std::time::Instant;

View File

@@ -240,6 +240,128 @@ impl XorShift32 {
}
stats_proc::stats_struct!((
stats_struct(
name(DaemonStats),
prefix(daemon),
counters(
critical_error,
todo_mark,
ticker_token_acquire_error,
ticker_token_release_error,
handle_timer_tick_count,
ioc_search_err,
ioc_search_some,
ioc_search_none,
lookupaddr_ok,
events,
event_ca_conn,
ca_conn_status_done,
ca_conn_status_feedback_timeout,
ca_conn_status_feedback_recv,
ca_conn_status_feedback_no_dst,
ca_echo_timeout_total,
caconn_done_channel_state_reset,
insert_worker_spawned,
insert_worker_join_ok,
insert_worker_join_ok_err,
insert_worker_join_err,
caconnset_health_response,
),
values(
channel_unknown_address,
channel_search_pending,
channel_with_address,
channel_no_address,
connset_health_lat_ema,
),
),
agg(name(DaemonStatsAgg), parent(DaemonStats)),
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),
stats_struct(
name(CaConnStats),
prefix(caconn),
counters(
insert_item_create,
inserts_val,
inserts_msp,
inserts_msp_grid,
inserts_queue_pop_for_global,
inserts_queue_push,
inserts_queue_drop,
insert_item_queue_pressure,
insert_item_queue_full,
out_queue_full,
channel_fast_item_drop,
logic_error,
// TODO maybe rename: this is now only the recv of the intermediate queue:
store_worker_item_recv,
// TODO rename to make clear that this drop is voluntary because of user config choice:
// store_worker_fraction_drop,
// store_worker_ratelimit_drop,
// store_worker_insert_done,
// store_worker_insert_binned_done,
// store_worker_insert_overload,
// store_worker_insert_timeout,
// store_worker_insert_unavailable,
// store_worker_insert_error,
connection_status_insert_done,
channel_status_insert_done,
channel_info_insert_done,
ivl_insert_done,
mute_insert_done,
poll_count,
loop1_count,
loop2_count,
loop3_count,
loop4_count,
command_can_not_reply,
time_handle_conn_listen,
time_handle_peer_ready,
time_check_channels_state_init,
time_handle_event_add_res,
tcp_connected,
get_series_id_ok,
item_count,
stream_ready,
stream_pending,
channel_all_count,
channel_alive_count,
channel_not_alive_count,
channel_series_lookup_already_pending,
ping_start,
ping_no_proto,
pong_timeout,
poll_fn_begin,
poll_loop_begin,
poll_reloop,
poll_pending,
poll_no_progress_no_pending,
poll_wake_break,
storage_queue_send,
storage_queue_pending,
event_add_res_recv,
caget_issued,
caget_timeout,
unknown_subid,
unknown_ioid,
transition_to_polling,
transition_to_polling_already_in,
transition_to_polling_bad_state,
),
values(inter_ivl_ema, read_ioids_len, proto_out_len,),
histolog2s(
poll_all_dt,
poll_op3_dt,
poll_reloops,
pong_recv_lat,
ca_ts_off,
iiq_len,
iiq_batch_len,
caget_lat,
),
),
agg(name(CaConnStatsAgg), parent(CaConnStats)),
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
stats_struct(
name(CaProtoStats),
prefix(ca_proto),
@@ -250,8 +372,10 @@ stats_proc::stats_struct!((
payload_std_too_large,
payload_ext_but_small,
payload_ext_very_large,
out_msg_placed,
out_bytes,
),
histolog2s(payload_size, data_count,),
histolog2s(payload_size, data_count, outbuf_len,),
),
stats_struct(
name(CaConnSetStats),
@@ -259,6 +383,7 @@ stats_proc::stats_struct!((
counters(
channel_add,
channel_status_series_found,
channel_health_timeout_soon,
channel_health_timeout,
ioc_search_start,
ioc_addr_found,
@@ -270,6 +395,7 @@ stats_proc::stats_struct!((
ca_conn_task_join_done_err,
ca_conn_task_join_err,
ca_conn_eos_ok,
ca_conn_eos_err,
ca_conn_eos_unexpected,
response_tx_fail,
try_push_ca_conn_cmds_sent,
@@ -300,6 +426,7 @@ stats_proc::stats_struct!((
channel_search_pending,
channel_no_address,
channel_unassigned,
channel_backoff,
channel_assigned,
channel_connected,
channel_maybe_wrong_address,
@@ -370,130 +497,6 @@ stats_proc::stats_struct!((
stats_struct(name(SeriesWriterEstablishStats), prefix(wrest), counters(job_recv,),),
));
stats_proc::stats_struct!((
stats_struct(
name(CaConnStats),
prefix(caconn),
counters(
insert_item_create,
inserts_val,
inserts_msp,
inserts_msp_grid,
inserts_queue_pop_for_global,
inserts_queue_push,
inserts_queue_drop,
insert_item_queue_pressure,
insert_item_queue_full,
out_queue_full,
channel_fast_item_drop,
logic_error,
// TODO maybe rename: this is now only the recv of the intermediate queue:
store_worker_item_recv,
// TODO rename to make clear that this drop is voluntary because of user config choice:
// store_worker_fraction_drop,
// store_worker_ratelimit_drop,
// store_worker_insert_done,
// store_worker_insert_binned_done,
// store_worker_insert_overload,
// store_worker_insert_timeout,
// store_worker_insert_unavailable,
// store_worker_insert_error,
connection_status_insert_done,
channel_status_insert_done,
channel_info_insert_done,
ivl_insert_done,
mute_insert_done,
poll_count,
loop1_count,
loop2_count,
loop3_count,
loop4_count,
command_can_not_reply,
time_handle_conn_listen,
time_handle_peer_ready,
time_check_channels_state_init,
time_handle_event_add_res,
tcp_connected,
get_series_id_ok,
item_count,
stream_ready,
stream_pending,
channel_all_count,
channel_alive_count,
channel_not_alive_count,
channel_series_lookup_already_pending,
ping_start,
ping_no_proto,
pong_timeout,
poll_fn_begin,
poll_loop_begin,
poll_reloop,
poll_pending,
poll_no_progress_no_pending,
poll_wake_break,
storage_queue_send,
storage_queue_pending,
storage_queue_above_8,
storage_queue_above_32,
storage_queue_above_128,
event_add_res_recv,
caget_timeout,
),
values(inter_ivl_ema, read_ioids_len, proto_out_len,),
histolog2s(
poll_all_dt,
poll_op3_dt,
poll_reloops,
pong_recv_lat,
ca_ts_off,
iiq_batch_len,
caget_lat,
),
),
agg(name(CaConnStatsAgg), parent(CaConnStats)),
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
));
stats_proc::stats_struct!((
stats_struct(
name(DaemonStats),
prefix(daemon),
counters(
critical_error,
todo_mark,
ticker_token_acquire_error,
ticker_token_release_error,
handle_timer_tick_count,
ioc_search_err,
ioc_search_some,
ioc_search_none,
lookupaddr_ok,
events,
event_ca_conn,
ca_conn_status_done,
ca_conn_status_feedback_timeout,
ca_conn_status_feedback_recv,
ca_conn_status_feedback_no_dst,
ca_echo_timeout_total,
caconn_done_channel_state_reset,
insert_worker_spawned,
insert_worker_join_ok,
insert_worker_join_ok_err,
insert_worker_join_err,
caconnset_health_response,
),
values(
channel_unknown_address,
channel_search_pending,
channel_with_address,
channel_no_address,
connset_health_lat_ema,
),
),
agg(name(DaemonStatsAgg), parent(DaemonStats)),
diff(name(DaemonStatsAggDiff), input(DaemonStatsAgg)),
));
stats_proc::stats_struct!((
stats_struct(name(TestStats0), counters(count0,), values(val0),),
diff(name(TestStats0Diff), input(TestStats0)),