This commit is contained in:
Dominik Werder
2023-02-17 14:24:55 +01:00
parent cd13cc8374
commit 1ba74e1342
+43 -53
View File
@@ -140,6 +140,7 @@ struct CreatedState {
sid: u32,
scalar_type: ScalarType,
shape: Shape,
#[allow(unused)]
ts_created: Instant,
ts_alive_last: Instant,
state: MonitoringState,
@@ -151,7 +152,6 @@ struct CreatedState {
insert_recv_ivl_last: Instant,
insert_next_earliest: Instant,
muted_before: u32,
series: Option<SeriesId>,
info_store_msp_last: u32,
}
@@ -164,7 +164,8 @@ enum ChannelState {
cid: Cid,
ts_beg: Instant,
},
Created(CreatedState),
FetchingSeriesId(CreatedState),
Created(SeriesId, CreatedState),
Error(ChannelError),
Ended,
}
@@ -172,33 +173,34 @@ enum ChannelState {
impl ChannelState {
fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo {
let channel_connected_info = match self {
ChannelState::Init(_) => ChannelConnectedInfo::Disconnected,
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
ChannelState::Created(_) => ChannelConnectedInfo::Connected,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
ChannelState::FetchingSeriesId(..) => ChannelConnectedInfo::Connecting,
ChannelState::Created(..) => ChannelConnectedInfo::Connected,
ChannelState::Error(..) => ChannelConnectedInfo::Error,
ChannelState::Ended => ChannelConnectedInfo::Ended,
};
let scalar_type = match self {
ChannelState::Created(s) => Some(s.scalar_type.clone()),
ChannelState::Created(_series, s) => Some(s.scalar_type.clone()),
_ => None,
};
let shape = match self {
ChannelState::Created(s) => Some(s.shape.clone()),
ChannelState::Created(_series, s) => Some(s.shape.clone()),
_ => None,
};
let ts_created = match self {
ChannelState::Created(s) => Some(s.ts_created.clone()),
ChannelState::Created(_series, s) => Some(s.ts_created.clone()),
_ => None,
};
let ts_event_last = match self {
ChannelState::Created(s) => match &s.state {
ChannelState::Created(_series, s) => match &s.state {
MonitoringState::Evented(_, s) => Some(s.ts_last),
_ => None,
},
_ => None,
};
let item_recv_ivl_ema = match self {
ChannelState::Created(s) => {
ChannelState::Created(_series, s) => {
let ema = s.item_recv_ivl_ema.ema();
if ema.update_count() == 0 {
None
@@ -209,7 +211,7 @@ impl ChannelState {
_ => None,
};
let series = match self {
ChannelState::Created(s) => s.series.clone(),
ChannelState::Created(series, _) => Some(series.clone()),
_ => None,
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
@@ -706,10 +708,6 @@ impl CaConn {
}
}
fn cid_by_name(&mut self, name: &str) -> Cid {
Self::cid_by_name_expl(name, &mut self.cid_by_name, &mut self.name_by_cid, &mut self.cid_store)
}
fn name_by_cid(&self, cid: Cid) -> Option<&str> {
self.name_by_cid.get(&cid).map(|x| x.as_str())
}
@@ -726,32 +724,27 @@ impl CaConn {
fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
trace!("channel_state_on_shutdown channels {}", self.channels.len());
let mut warn_max = 0;
for (_cid, chst) in &mut self.channels {
match chst {
ChannelState::Init(cssid) => {
ChannelState::Init(..) => {
*chst = ChannelState::Ended;
}
ChannelState::Creating { .. } => {
*chst = ChannelState::Ended;
}
ChannelState::Created(st) => {
if let Some(series) = &st.series {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.insert_item_queue.push_back(item);
} else {
if warn_max < 10 {
debug!("no series for cid {:?}", st.cid);
warn_max += 1;
}
}
ChannelState::FetchingSeriesId(..) => {
*chst = ChannelState::Ended;
}
ChannelState::Error(_) => {
ChannelState::Created(series, ..) => {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.insert_item_queue.push_back(item);
*chst = ChannelState::Ended;
}
ChannelState::Error(..) => {
*chst = ChannelState::Ended;
}
ChannelState::Ended => {}
@@ -831,14 +824,7 @@ impl CaConn {
let mut not_alive_count = 0;
for (_, st) in &self.channels {
match st {
ChannelState::Creating { cid, ts_beg, cssid: _ } => {
if false && tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) {
let name = self.name_by_cid.get(cid);
// TODO channel create timed out how to let daemon know?
warn!("channel Creating timed out {} {:?}", cid.0, name);
}
}
ChannelState::Created(st) => {
ChannelState::Created(_, st) => {
if tsnow.duration_since(st.ts_alive_last) >= Duration::from_millis(10000) {
not_alive_count += 1;
} else {
@@ -864,7 +850,7 @@ impl CaConn {
let timenow = SystemTime::now();
for (_, st) in &mut self.channels {
match st {
ChannelState::Init(cssid) => {
ChannelState::Init(_cssid) => {
// TODO need last-save-ts for this state.
}
ChannelState::Creating {
@@ -874,7 +860,10 @@ impl CaConn {
} => {
// TODO need last-save-ts for this state.
}
ChannelState::Created(st) => {
ChannelState::FetchingSeriesId(..) => {
// TODO ?
}
ChannelState::Created(series, st) => {
// TODO if we don't wave a series id yet, dont' save? write-ampl.
let msp = info_store_msp_from_time(timenow.clone());
@@ -882,7 +871,7 @@ impl CaConn {
st.info_store_msp_last = msp;
let item = QueryItem::ChannelInfo(ChannelInfoItem {
ts_msp: msp,
series: st.series.clone().unwrap_or(SeriesId::new(0)),
series: series.clone(),
ivl: st.item_recv_ivl_ema.ema().ema(),
interest: 0.,
evsize: 0,
@@ -938,13 +927,14 @@ impl CaConn {
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(),
ChannelState::Created(_series, st2) => st2.cssid.clone(),
_ => {
let e = Error::with_msg_no_trace("channel_to_evented bad state");
let name = self.name_by_cid.get(&cid);
let e = Error::with_msg_no_trace(format!("channel_to_evented bad state {name:?} {ch_s:?}"));
return Err(e);
}
};
*ch_s = ChannelState::Created(CreatedState {
let created_state = CreatedState {
cssid,
cid,
sid,
@@ -962,9 +952,9 @@ impl CaConn {
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
series: Some(series),
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
});
};
*ch_s = ChannelState::Created(series, created_state);
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let _cd = ChannelDescDecoded {
@@ -1150,7 +1140,7 @@ impl CaConn {
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();
match ch_s {
ChannelState::Created(st) => {
ChannelState::Created(_series, st) => {
st.ts_alive_last = tsnow;
st.item_recv_ivl_ema.tick(tsnow);
let scalar_type = st.scalar_type.clone();
@@ -1389,13 +1379,13 @@ impl CaConn {
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(),
ChannelState::Creating { cssid, .. } => cssid.clone(),
_ => {
let e = Error::with_msg_no_trace("channel_to_evented bad state");
return Ready(Some(Err(e)));
}
};
*ch_s = ChannelState::Created(CreatedState {
let created_state = CreatedState {
cssid,
cid,
sid,
@@ -1412,9 +1402,9 @@ impl CaConn {
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
series: None,
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
});
};
*ch_s = ChannelState::FetchingSeriesId(created_state);
// TODO handle error in different way. Should most likely not abort.
let _cd = ChannelDescDecoded {
name: name.clone(),