WIP refactor channel status write

This commit is contained in:
Dominik Werder
2024-07-26 11:54:42 +02:00
parent 5319a9271d
commit a5caec0591
16 changed files with 360 additions and 321 deletions

View File

@@ -56,6 +56,9 @@ use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::binwriter::BinWriter;
use serieswriter::fixgridwriter::ChannelStatusSeriesWriter;
use serieswriter::fixgridwriter::ChannelStatusWriteState;
use serieswriter::fixgridwriter::CHANNEL_STATUS_GRID;
use serieswriter::msptool::MspSplit;
use serieswriter::rtwriter::RtWriter;
use serieswriter::writer::EmittableType;
@@ -156,6 +159,7 @@ pub enum Error {
Protocol(#[from] crate::ca::proto::Error),
RtWriter(#[from] serieswriter::rtwriter::Error),
BinWriter(#[from] serieswriter::binwriter::Error),
SeriesWriter(#[from] serieswriter::writer::Error),
// TODO remove false positive from ThisError derive
#[allow(private_interfaces)]
UnknownCid(Cid),
@@ -538,9 +542,24 @@ struct ClosingState {
struct ChannelConf {
conf: ChannelConfig,
state: ChannelState,
wrst: WriterStatus,
}
impl ChannelConf {
fn new(conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Self {
Self {
conf,
state: ChannelState::Init(cssid),
wrst: WriterStatus {
writer_status: serieswriter::writer::SeriesWriter::new(SeriesId::new(cssid.id())).unwrap(),
writer_status_state: serieswriter::fixgridwriter::ChannelStatusWriteState::new(
SeriesId::new(cssid.id()),
serieswriter::fixgridwriter::CHANNEL_STATUS_GRID,
),
},
}
}
pub fn poll_conf(&self) -> Option<(u64,)> {
self.conf.poll_conf()
}
@@ -665,6 +684,29 @@ impl ChannelState {
}
}
#[derive(Debug)]
struct WriterStatus {
writer_status: ChannelStatusSeriesWriter,
writer_status_state: ChannelStatusWriteState,
}
impl WriterStatus {
fn emit_channel_status_item(
&mut self,
item: ChannelStatusItem,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
let (ts, val) = item.to_ts_val();
self.writer_status.write(
serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val),
&mut self.writer_status_state,
Instant::now(),
deque,
)?;
Ok(())
}
}
enum CaConnState {
Unconnected(Instant),
Connecting(
@@ -1128,17 +1170,24 @@ impl CaConn {
self.channel_state_on_shutdown(channel_reason);
let addr = self.remote_addr_dbg.clone();
// TODO handle Err:
let _ = self
.iqdqs
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
// TODO map to appropriate status
status: ConnectionStatus::Closing,
}));
let item = ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
// TODO map to appropriate status
status: ConnectionStatus::Closing,
};
if self.emit_connection_status_item(item).is_err() {
self.stats.logic_error().inc();
}
self.proto = None;
}
fn emit_connection_status_item(&mut self, _item: ConnectionStatusItem) -> Result<(), Error> {
// todo!()
// TODO emit
Ok(())
}
fn cmd_channel_close(&mut self, name: String) {
self.channel_close(name);
// TODO return the result
@@ -1279,8 +1328,7 @@ impl CaConn {
if let Some(conf) = self.channels.get_mut(&cid) {
// TODO refactor, should only execute this when required:
let conf_poll_conf = conf.poll_conf();
let chst = &mut conf.state;
if let ChannelState::MakingSeriesWriter(st2) = chst {
if let ChannelState::MakingSeriesWriter(st2) = &mut conf.state {
let dt = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap();
let beg = TsNano::from_ns(SEC * dt.as_secs() + dt.subsec_nanos() as u64);
let binwriter = BinWriter::new(
@@ -1293,12 +1341,13 @@ impl CaConn {
)?;
self.stats.get_series_id_ok.inc();
{
let item = QueryItem::ChannelStatus(ChannelStatusItem {
info!("queued Opened {:?}", st2.channel.cssid);
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
});
self.iqdqs.emit_status_item(item)?;
};
conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?;
}
if let Some((ivl,)) = conf_poll_conf {
let created_state = WritableState {
@@ -1380,10 +1429,7 @@ impl CaConn {
error!("logic error channel already exists {conf:?}");
Ok(())
} else {
let conf = ChannelConf {
conf,
state: ChannelState::Init(cssid),
};
let conf = ChannelConf::new(conf, cssid);
self.channels.insert(cid, conf);
// TODO do not count, use separate queue for those channels.
self.init_state_count += 1;
@@ -1472,12 +1518,15 @@ impl CaConn {
let cssid = st2.channel.cssid.clone();
// TODO should call the proper channel-close handler which in turn emits the status item.
// Make sure I record the reason for the "Close": user command, IOC error, etc..
let item = QueryItem::ChannelStatus(ChannelStatusItem {
let item = ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: cssid.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.iqdqs.emit_status_item(item);
};
let deque = &mut self.iqdqs.st_rf3_qu;
if conf.wrst.emit_channel_status_item(item, deque).is_err() {
self.stats.logic_error().inc();
}
*chst = ChannelState::Ended(cssid);
}
ChannelState::Error(..) => {
@@ -2617,12 +2666,11 @@ impl CaConn {
Ok(Ok(tcp)) => {
self.stats.tcp_connected.inc();
let addr = addr.clone();
self.iqdqs
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::Established,
}))?;
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::Established,
})?;
self.backoff_reset();
let proto = CaProto::new(
tcp,
@@ -2638,12 +2686,11 @@ impl CaConn {
use std::io::ErrorKind;
debug!("error connect to {addr} {e}");
let addr = addr.clone();
self.iqdqs
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
}))?;
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
})?;
let reason = match e.kind() {
ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused,
_ => ShutdownReason::IoError,
@@ -2655,12 +2702,11 @@ impl CaConn {
// TODO log with exponential backoff
debug!("timeout connect to {addr} {e}");
let addr = addr.clone();
self.iqdqs
.emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
}))?;
self.emit_connection_status_item(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
})?;
self.trigger_shutdown(ShutdownReason::ConnectTimeout);
Ok(Ready(Some(())))
}
@@ -2876,8 +2922,8 @@ impl CaConn {
}
fn emit_channel_event_pong(&mut self) {
for (cid, ch) in self.channels.iter() {
match &ch.state {
for (_, ch) in self.channels.iter_mut() {
match &mut ch.state {
ChannelState::Init(_) => {}
ChannelState::Creating(_) => {}
ChannelState::FetchEnumDetails(_) => {}
@@ -2889,8 +2935,10 @@ impl CaConn {
cssid: st1.channel.cssid,
status: ChannelStatus::Pong,
};
let item = QueryItem::ChannelStatus(item);
self.iqdqs.st_rf3_rx.push_back(item);
let deque = &mut self.iqdqs.st_rf3_qu;
if ch.wrst.emit_channel_status_item(item, deque).is_err() {
self.stats.logic_error().inc();
}
}
ChannelState::Closing(_) => {}
ChannelState::Error(_) => {}
@@ -2909,10 +2957,6 @@ impl CaConn {
Ok(())
}
fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> {
Ok(())
}
fn queues_out_flushed(&self) -> bool {
debug!(
"async out flushed iiq {} {} caout {}",
@@ -3024,7 +3068,6 @@ macro_rules! flush_queue_dqs {
// let sp = std::pin::pin!(obj.iqsp.$sp);
// let sp = &mut obj.iqsp.$sp;
// let sp = std::pin::pin!(sp);
// let sp = todo!();
let sp = obj.iqsp.as_mut().$sp();
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
Ok(Ready(Some(()))) => {
@@ -3108,7 +3151,7 @@ impl Stream for CaConn {
};
flush_queue_dqs!(
self,
st_rf1_rx,
st_rf1_qu,
st_rf1_sp_pin,
send_batched::<256, _>,
32,
@@ -3124,7 +3167,7 @@ impl Stream for CaConn {
};
flush_queue_dqs!(
self,
st_rf3_rx,
st_rf3_qu,
st_rf3_sp_pin,
send_batched::<256, _>,
32,
@@ -3140,7 +3183,7 @@ impl Stream for CaConn {
};
flush_queue_dqs!(
self,
mt_rf3_rx,
mt_rf3_qu,
mt_rf3_sp_pin,
send_batched::<256, _>,
32,
@@ -3156,7 +3199,7 @@ impl Stream for CaConn {
};
flush_queue_dqs!(
self,
lt_rf3_rx,
lt_rf3_qu,
lt_rf3_sp_pin,
send_batched::<256, _>,
32,
@@ -3489,7 +3532,7 @@ impl EmittableType for CaWriterValue {
ts_net,
)));
}
let data_value = DataValue::Scalar(ScalarValue::CaStatus(meta.status as i16));
let data_value = DataValue::Scalar(ScalarValue::I16(meta.status as i16));
let item = scywriiq::InsertItem {
series: state.series_status.clone(),
ts_msp: ts_msp.to_ts_ms(),

View File

@@ -63,6 +63,7 @@ use std::pin::Pin;
use netpod::OnDrop;
use scywr::insertqueues::InsertQueuesTx;
use series::SeriesId;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
@@ -173,7 +174,7 @@ pub struct ChannelStatusRequest {
pub tx: Sender<ChannelStatusResponse>,
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Serialize)]
pub struct ChannelStatusResponse {
pub channels_ca_conn: BTreeMap<String, ChannelStateInfo>,
pub channels_ca_conn_set: BTreeMap<String, ChannelState>,
@@ -191,7 +192,7 @@ pub struct ChannelStatusesRequest {
pub tx: Sender<ChannelStatusesResponse>,
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Serialize)]
pub struct ChannelStatusesResponse {
pub channels_ca_conn_set: BTreeMap<String, ChannelState>,
}
@@ -238,10 +239,6 @@ pub struct CaConnSetCtrl {
}
impl CaConnSetCtrl {
pub fn new() -> Self {
todo!()
}
pub fn sender(&self) -> Sender<CaConnSetEvent> {
self.tx.clone()
}
@@ -561,7 +558,7 @@ impl CaConnSet {
backend: cmd.backend,
channel: channel_name,
kind: SeriesKind::ChannelStatus,
scalar_type: ScalarType::ChannelStatus,
scalar_type: ScalarType::U64,
shape: Shape::Scalar,
tx: Box::pin(SeriesLookupSender { tx }),
};
@@ -636,12 +633,20 @@ impl CaConnSet {
self.cssid_latency_max = dt + Duration::from_millis(2000);
debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd);
}
let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new(
SeriesId::new(cmd.cssid.id()),
serieswriter::fixgridwriter::CHANNEL_STATUS_GRID,
);
*chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState {
cssid: cmd.cssid,
addr_find_backoff: 0,
inner: WithStatusSeriesIdStateInner::AddrSearchPending {
since: SystemTime::now(),
},
writer_status: Some(writer_status),
writer_status_state: Some(writer_status_state),
});
let qu = IocAddrQuery::cached(name.into());
self.find_ioc_query_queue.push_back(qu);
@@ -682,6 +687,12 @@ impl CaConnSet {
trace!("handle_add_channel_with_addr INNER {cmd:?}");
self.stats.handle_add_channel_with_addr().inc();
let tsnow = SystemTime::now();
let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new(
SeriesId::new(cmd.cssid.id()),
serieswriter::fixgridwriter::CHANNEL_STATUS_GRID,
);
*st3 = WithStatusSeriesIdState {
cssid: cmd.cssid.clone(),
addr_find_backoff: 0,
@@ -693,6 +704,8 @@ impl CaConnSet {
value: ConnectionStateValue::Unknown,
}),
},
writer_status: Some(writer_status),
writer_status_state: Some(writer_status_state),
};
let addr = cmd.addr;
if self.ca_conn_ress.contains_key(&addr) {
@@ -1131,14 +1144,6 @@ impl CaConnSet {
}
}
fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> {
let item = QueryItem::ChannelStatus(item);
let mut v = VecDeque::new();
v.push_back(item);
self.storage_insert_queue.push_back(v);
Ok(())
}
#[allow(unused)]
async fn __enqueue_command_to_all<F>(&self, cmdgen: F) -> Result<Vec<CmdId>, Error>
where
@@ -1272,7 +1277,6 @@ impl CaConnSet {
let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts();
let mut cmd_remove_channel = Vec::new();
let mut cmd_add_channel = Vec::new();
let mut channel_status_items = Vec::new();
let k = self.chan_check_next.take();
let it = if let Some(last) = k {
trace!("check_chans start at {:?}", last);
@@ -1280,6 +1284,7 @@ impl CaConnSet {
} else {
self.channel_states.range_mut(..)
};
let mut item_deque = VecDeque::new();
for (i, (ch, st)) in it.enumerate() {
match &mut st.value {
ChannelStateValue::Active(st2) => match st2 {
@@ -1368,7 +1373,18 @@ impl CaConnSet {
MaybeWrongAddressState::new(stnow, st3.addr_find_backoff),
);
let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone());
channel_status_items.push(item);
let (ts, val) = item.to_ts_val();
let deque = &mut item_deque;
st3.writer_status
.as_mut()
.unwrap()
.write(
serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val),
st3.writer_status_state.as_mut().unwrap(),
tsnow,
deque,
)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
}
}
}
@@ -1404,9 +1420,7 @@ impl CaConnSet {
break;
}
}
for item in channel_status_items {
self.push_channel_status(item)?;
}
self.storage_insert_queue.push_back(item_deque);
for (addr, ch) in cmd_remove_channel {
if let Some(g) = self.ca_conn_ress.get_mut(&addr) {
let cmd = ConnCommand::channel_close(ch.name().into());

View File

@@ -4,6 +4,8 @@ use crate::daemon_common::Channel;
use dashmap::DashMap;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use serieswriter::fixgridwriter::ChannelStatusSeriesWriter;
use serieswriter::fixgridwriter::ChannelStatusWriteState;
use std::collections::btree_map::RangeMut;
use std::collections::BTreeMap;
use std::collections::HashMap;
@@ -105,11 +107,36 @@ impl MaybeWrongAddressState {
}
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Serialize)]
pub struct WithStatusSeriesIdState {
pub cssid: ChannelStatusSeriesId,
pub addr_find_backoff: u32,
pub inner: WithStatusSeriesIdStateInner,
#[serde(serialize_with = "serde_ser_channel_status_writer")]
pub writer_status: Option<ChannelStatusSeriesWriter>,
#[serde(skip)]
pub writer_status_state: Option<ChannelStatusWriteState>,
}
// Need Clone because we use the state tree for metrics output
// TODO use a new info struct
impl Clone for WithStatusSeriesIdState {
fn clone(&self) -> Self {
Self {
cssid: self.cssid.clone(),
addr_find_backoff: self.addr_find_backoff.clone(),
inner: self.inner.clone(),
writer_status: None,
writer_status_state: None,
}
}
}
fn serde_ser_channel_status_writer<S>(_: &Option<ChannelStatusSeriesWriter>, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
ser.serialize_none()
}
#[derive(Debug, Clone, Serialize)]
@@ -137,7 +164,7 @@ pub struct ChannelState {
pub config: ChannelConfig,
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Serialize)]
pub struct ChannelStateMap {
map: BTreeMap<Channel, ChannelState>,
#[serde(skip)]

View File

@@ -245,7 +245,6 @@ async fn post_v01_try(
})?;
}
ScalarType::Enum => return Err(Error::NotSupported),
ScalarType::ChannelStatus => return Err(Error::NotSupported),
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => {
@@ -281,7 +280,6 @@ async fn post_v01_try(
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::STRING => return Err(Error::NotSupported),
ScalarType::Enum => return Err(Error::NotSupported),
ScalarType::ChannelStatus => return Err(Error::NotSupported),
},
Shape::Image(_, _) => return Err(Error::NotSupported),
}