From 1ba74e134216dc0186c0596b311c9e9ea0a49b38 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 17 Feb 2023 14:24:55 +0100 Subject: [PATCH] Refactor --- netfetch/src/ca/conn.rs | 96 ++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 4f1ceb3..c52c6ca 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -140,6 +140,7 @@ struct CreatedState { sid: u32, scalar_type: ScalarType, shape: Shape, + #[allow(unused)] ts_created: Instant, ts_alive_last: Instant, state: MonitoringState, @@ -151,7 +152,6 @@ struct CreatedState { insert_recv_ivl_last: Instant, insert_next_earliest: Instant, muted_before: u32, - series: Option, info_store_msp_last: u32, } @@ -164,7 +164,8 @@ enum ChannelState { cid: Cid, ts_beg: Instant, }, - Created(CreatedState), + FetchingSeriesId(CreatedState), + Created(SeriesId, CreatedState), Error(ChannelError), Ended, } @@ -172,33 +173,34 @@ enum ChannelState { impl ChannelState { fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo { let channel_connected_info = match self { - ChannelState::Init(_) => ChannelConnectedInfo::Disconnected, + ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, - ChannelState::Created(_) => ChannelConnectedInfo::Connected, - ChannelState::Error(_) => ChannelConnectedInfo::Error, + ChannelState::FetchingSeriesId(..) => ChannelConnectedInfo::Connecting, + ChannelState::Created(..) => ChannelConnectedInfo::Connected, + ChannelState::Error(..) => ChannelConnectedInfo::Error, ChannelState::Ended => ChannelConnectedInfo::Ended, }; let scalar_type = match self { - ChannelState::Created(s) => Some(s.scalar_type.clone()), + ChannelState::Created(_series, s) => Some(s.scalar_type.clone()), _ => None, }; let shape = match self { - ChannelState::Created(s) => Some(s.shape.clone()), + ChannelState::Created(_series, s) => Some(s.shape.clone()), _ => None, }; let ts_created = match self { - ChannelState::Created(s) => Some(s.ts_created.clone()), + ChannelState::Created(_series, s) => Some(s.ts_created.clone()), _ => None, }; let ts_event_last = match self { - ChannelState::Created(s) => match &s.state { + ChannelState::Created(_series, s) => match &s.state { MonitoringState::Evented(_, s) => Some(s.ts_last), _ => None, }, _ => None, }; let item_recv_ivl_ema = match self { - ChannelState::Created(s) => { + ChannelState::Created(_series, s) => { let ema = s.item_recv_ivl_ema.ema(); if ema.update_count() == 0 { None @@ -209,7 +211,7 @@ impl ChannelState { _ => None, }; let series = match self { - ChannelState::Created(s) => s.series.clone(), + ChannelState::Created(series, _) => Some(series.clone()), _ => None, }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); @@ -706,10 +708,6 @@ impl CaConn { } } - fn cid_by_name(&mut self, name: &str) -> Cid { - Self::cid_by_name_expl(name, &mut self.cid_by_name, &mut self.name_by_cid, &mut self.cid_store) - } - fn name_by_cid(&self, cid: Cid) -> Option<&str> { self.name_by_cid.get(&cid).map(|x| x.as_str()) } @@ -726,32 +724,27 @@ impl CaConn { fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { trace!("channel_state_on_shutdown channels {}", self.channels.len()); - let mut warn_max = 0; for (_cid, chst) in &mut self.channels { match chst { - ChannelState::Init(cssid) => { + ChannelState::Init(..) => { *chst = ChannelState::Ended; } ChannelState::Creating { .. } => { *chst = ChannelState::Ended; } - ChannelState::Created(st) => { - if let Some(series) = &st.series { - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), - series: series.clone(), - status: ChannelStatus::Closed(channel_reason.clone()), - }); - self.insert_item_queue.push_back(item); - } else { - if warn_max < 10 { - debug!("no series for cid {:?}", st.cid); - warn_max += 1; - } - } + ChannelState::FetchingSeriesId(..) => { *chst = ChannelState::Ended; } - ChannelState::Error(_) => { + ChannelState::Created(series, ..) => { + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + series: series.clone(), + status: ChannelStatus::Closed(channel_reason.clone()), + }); + self.insert_item_queue.push_back(item); + *chst = ChannelState::Ended; + } + ChannelState::Error(..) => { *chst = ChannelState::Ended; } ChannelState::Ended => {} @@ -831,14 +824,7 @@ impl CaConn { let mut not_alive_count = 0; for (_, st) in &self.channels { match st { - ChannelState::Creating { cid, ts_beg, cssid: _ } => { - if false && tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) { - let name = self.name_by_cid.get(cid); - // TODO channel create timed out how to let daemon know? - warn!("channel Creating timed out {} {:?}", cid.0, name); - } - } - ChannelState::Created(st) => { + ChannelState::Created(_, st) => { if tsnow.duration_since(st.ts_alive_last) >= Duration::from_millis(10000) { not_alive_count += 1; } else { @@ -864,7 +850,7 @@ impl CaConn { let timenow = SystemTime::now(); for (_, st) in &mut self.channels { match st { - ChannelState::Init(cssid) => { + ChannelState::Init(_cssid) => { // TODO need last-save-ts for this state. } ChannelState::Creating { @@ -874,7 +860,10 @@ impl CaConn { } => { // TODO need last-save-ts for this state. } - ChannelState::Created(st) => { + ChannelState::FetchingSeriesId(..) => { + // TODO ? + } + ChannelState::Created(series, st) => { // TODO if we don't wave a series id yet, dont' save? write-ampl. let msp = info_store_msp_from_time(timenow.clone()); @@ -882,7 +871,7 @@ impl CaConn { st.info_store_msp_last = msp; let item = QueryItem::ChannelInfo(ChannelInfoItem { ts_msp: msp, - series: st.series.clone().unwrap_or(SeriesId::new(0)), + series: series.clone(), ivl: st.item_recv_ivl_ema.ema().ema(), interest: 0., evsize: 0, @@ -938,13 +927,14 @@ impl CaConn { // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); let cssid = match ch_s { - ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(), + ChannelState::Created(_series, st2) => st2.cssid.clone(), _ => { - let e = Error::with_msg_no_trace("channel_to_evented bad state"); + let name = self.name_by_cid.get(&cid); + let e = Error::with_msg_no_trace(format!("channel_to_evented bad state {name:?} {ch_s:?}")); return Err(e); } }; - *ch_s = ChannelState::Created(CreatedState { + let created_state = CreatedState { cssid, cid, sid, @@ -962,9 +952,9 @@ impl CaConn { insert_recv_ivl_last: tsnow, insert_next_earliest: tsnow, muted_before: 0, - series: Some(series), info_store_msp_last: info_store_msp_from_time(SystemTime::now()), - }); + }; + *ch_s = ChannelState::Created(series, created_state); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; let _cd = ChannelDescDecoded { @@ -1150,7 +1140,7 @@ impl CaConn { let mut series_2 = None; let ch_s = self.channels.get_mut(&cid).unwrap(); match ch_s { - ChannelState::Created(st) => { + ChannelState::Created(_series, st) => { st.ts_alive_last = tsnow; st.item_recv_ivl_ema.tick(tsnow); let scalar_type = st.scalar_type.clone(); @@ -1389,13 +1379,13 @@ impl CaConn { // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); let cssid = match ch_s { - ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(), + ChannelState::Creating { cssid, .. } => cssid.clone(), _ => { let e = Error::with_msg_no_trace("channel_to_evented bad state"); return Ready(Some(Err(e))); } }; - *ch_s = ChannelState::Created(CreatedState { + let created_state = CreatedState { cssid, cid, sid, @@ -1412,9 +1402,9 @@ impl CaConn { insert_recv_ivl_last: tsnow, insert_next_earliest: tsnow, muted_before: 0, - series: None, info_store_msp_last: info_store_msp_from_time(SystemTime::now()), - }); + }; + *ch_s = ChannelState::FetchingSeriesId(created_state); // TODO handle error in different way. Should most likely not abort. let _cd = ChannelDescDecoded { name: name.clone(),