diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 661f447..2ae1553 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -102,7 +102,6 @@ pub enum ChannelConnectedInfo { Connecting, Connected, Error, - Ended, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -121,6 +120,7 @@ pub struct ChannelStateInfo { #[serde(with = "ser_instant")] pub ts_event_last: Option, pub recv_count: Option, + pub recv_bytes: Option, // #[serde(skip_serializing_if = "Option::is_none")] pub item_recv_ivl_ema: Option, pub interest_score: f32, @@ -176,26 +176,74 @@ struct Cid(pub u32); #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] struct Subid(pub u32); +impl Subid { + pub fn to_u32(&self) -> u32 { + self.0 + } +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] struct Sid(pub u32); +impl Sid { + pub fn to_u32(&self) -> u32 { + self.0 + } +} + #[derive(Clone, Debug)] enum ChannelError { CreateChanFail(ChannelStatusSeriesId), } #[derive(Debug, Clone)] -struct EventedState { - ts_last: Instant, - recv_count: u64, - recv_bytes: u64, +struct CreatingState { + tsbeg: Instant, + cssid: ChannelStatusSeriesId, + cid: Cid, } #[derive(Debug, Clone)] -enum MonitoringState { - FetchSeriesId, - AddingEvent(SeriesId), - Evented(SeriesId, EventedState), +struct MakingSeriesWriterState { + tsbeg: Instant, + channel: CreatedState, +} + +#[derive(Debug, Clone)] +struct EnableMonitoringState { + tsbeg: Instant, +} + +#[derive(Debug, Clone)] +struct MonitoringState { + tsbeg: Instant, +} + +#[derive(Debug, Clone)] +struct StopMonitoringForPollingState { + tsbeg: Instant, +} + +#[derive(Debug, Clone)] +struct PollingState { + tsbeg: Instant, + poll_ivl: Duration, +} + +#[derive(Debug)] +struct CreatedState22 { + tsbeg: Instant, + channel: CreatedState, + writer: SeriesWriter, + reading: ReadingState, +} + +#[derive(Debug, Clone)] +enum ReadingState { + EnableMonitoring(EnableMonitoringState), + Monitoring(MonitoringState), + StopMonitoringForPolling(StopMonitoringForPollingState), + Polling(PollingState), } #[derive(Debug, Clone)] @@ -203,10 +251,8 @@ struct CreatedState { cssid: ChannelStatusSeriesId, cid: Cid, sid: Sid, - #[allow(unused)] ts_created: Instant, ts_alive_last: Instant, - state: MonitoringState, ts_msp_last: u64, ts_msp_grid_last: u32, inserted_in_ts_msp: u64, @@ -215,25 +261,43 @@ struct CreatedState { insert_recv_ivl_last: Instant, insert_next_earliest: Instant, muted_before: u32, + insert_ivl_min_mus: u32, info_store_msp_last: u32, + recv_count: u64, + recv_bytes: u64, } -#[derive(Debug)] -struct WritableState { - created: CreatedState, - writer: SeriesWriter, +impl CreatedState { + fn dummy() -> Self { + let tsnow = Instant::now(); + Self { + cssid: ChannelStatusSeriesId::new(0), + cid: Cid(0), + sid: Sid(0), + ts_created: tsnow, + ts_alive_last: tsnow, + ts_msp_last: 0, + ts_msp_grid_last: 0, + inserted_in_ts_msp: 0, + insert_item_ivl_ema: IntervalEma::new(), + item_recv_ivl_ema: IntervalEma::new(), + insert_recv_ivl_last: tsnow, + insert_next_earliest: tsnow, + muted_before: 0, + insert_ivl_min_mus: 0, + info_store_msp_last: 0, + recv_count: 0, + recv_bytes: 0, + } + } } #[derive(Debug)] enum ChannelState { Init(ChannelStatusSeriesId), - Creating { - cssid: ChannelStatusSeriesId, - cid: Cid, - ts_beg: Instant, - }, - MakingSeriesWriter(CreatedState), - Writable(WritableState), + Creating(CreatingState), + MakingSeriesWriter(MakingSeriesWriterState), + Created(CreatedState22), Error(ChannelError), Ended(ChannelStatusSeriesId), } @@ -244,39 +308,37 @@ impl ChannelState { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting, - ChannelState::Writable(..) => ChannelConnectedInfo::Connected, + ChannelState::Created(_) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, - ChannelState::Ended(_) => ChannelConnectedInfo::Ended, + ChannelState::Ended(_) => ChannelConnectedInfo::Disconnected, }; let scalar_type = match self { - ChannelState::Writable(s) => Some(s.writer.scalar_type().clone()), + ChannelState::Created(s) => Some(s.writer.scalar_type().clone()), _ => None, }; let shape = match self { - ChannelState::Writable(s) => Some(s.writer.shape().clone()), + ChannelState::Created(s) => Some(s.writer.shape().clone()), _ => None, }; let ts_created = match self { - ChannelState::Writable(s) => Some(s.created.ts_created.clone()), + ChannelState::Created(s) => Some(s.channel.ts_created.clone()), _ => None, }; let ts_event_last = match self { - ChannelState::Writable(s) => match &s.created.state { - MonitoringState::Evented(_, s) => Some(s.ts_last), - _ => None, - }, + ChannelState::Created(s) => Some(s.channel.ts_alive_last), _ => None, }; let recv_count = match self { - ChannelState::Writable(s) => match &s.created.state { - MonitoringState::Evented(_, s) => Some(s.recv_count), - _ => None, - }, + ChannelState::Created(s) => Some(s.channel.recv_count), + _ => None, + }; + let recv_bytes = match self { + ChannelState::Created(s) => Some(s.channel.recv_bytes), _ => None, }; let item_recv_ivl_ema = match self { - ChannelState::Writable(s) => { - let ema = s.created.item_recv_ivl_ema.ema(); + ChannelState::Created(s) => { + let ema = s.channel.item_recv_ivl_ema.ema(); if ema.update_count() == 0 { None } else { @@ -286,7 +348,7 @@ impl ChannelState { _ => None, }; let series = match self { - ChannelState::Writable(s) => Some(s.writer.sid()), + ChannelState::Created(s) => Some(s.writer.sid()), _ => None, }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); @@ -300,6 +362,7 @@ impl ChannelState { ts_created, ts_event_last, recv_count, + recv_bytes, item_recv_ivl_ema, interest_score, } @@ -308,9 +371,9 @@ impl ChannelState { fn cssid(&self) -> ChannelStatusSeriesId { match self { ChannelState::Init(cssid) => cssid.clone(), - ChannelState::Creating { cssid, .. } => cssid.clone(), - ChannelState::MakingSeriesWriter(st) => st.cssid.clone(), - ChannelState::Writable(st) => st.created.cssid.clone(), + ChannelState::Creating(st) => st.cssid.clone(), + ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(), + ChannelState::Created(st) => st.channel.cssid.clone(), ChannelState::Error(e) => match e { ChannelError::CreateChanFail(cssid) => cssid.clone(), }, @@ -558,12 +621,11 @@ pub struct CaConn { remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, stats: Arc, - insert_ivl_min_mus: u64, + insert_ivl_min_mus: u32, conn_command_tx: Pin>>, conn_command_rx: Pin>>, conn_backoff: f32, conn_backoff_beg: f32, - inserts_counter: u64, extra_inserts_conf: ExtraInsertsConf, ioc_ping_last: Instant, ioc_ping_next: Instant, @@ -580,6 +642,7 @@ pub struct CaConn { writer_tx: Sender<(JobId, Result)>, writer_rx: Pin)>>>, tmp_ts_poll: SystemTime, + poll_tsnow: Instant, } impl Drop for CaConn { @@ -600,6 +663,7 @@ impl CaConn { ca_proto_stats: Arc, writer_establish_tx: Sender, ) -> Self { + let _ = channel_info_query_tx; let tsnow = Instant::now(); let (writer_tx, writer_rx) = async_channel::bounded(32); let (cq_tx, cq_rx) = async_channel::bounded(32); @@ -628,7 +692,6 @@ impl CaConn { conn_command_rx: Box::pin(cq_rx), conn_backoff: 0.02, conn_backoff_beg: 0.02, - inserts_counter: 0, extra_inserts_conf: ExtraInsertsConf::new(), ioc_ping_last: tsnow, ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng), @@ -645,6 +708,7 @@ impl CaConn { writer_tx, writer_rx: Box::pin(writer_rx), tmp_ts_poll: SystemTime::now(), + poll_tsnow: tsnow, } } @@ -890,7 +954,7 @@ impl CaConn { } } - fn handle_writer_establish_inner(&mut self, cid: Cid, wr: SeriesWriter) -> Result<(), Error> { + fn handle_writer_establish_inner(&mut self, cid: Cid, writer: SeriesWriter) -> Result<(), Error> { debug!("handle_writer_establish_inner {cid:?}"); // At this point we have created the channel and created a writer for that type and sid. // We do not yet monitor. @@ -901,18 +965,22 @@ impl CaConn { if let Some(chst) = self.channels.get_mut(&cid) { if let ChannelState::MakingSeriesWriter(st2) = chst { self.stats.get_series_id_ok.inc(); - - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: self.tmp_ts_poll, - cssid: st2.cssid.clone(), - status: ChannelStatus::Opened, - }); - self.insert_item_queue.push_back(item); - { - let data_type = wr.scalar_type().to_ca_id()?; + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st2.channel.cssid.clone(), + status: ChannelStatus::Opened, + }); + self.insert_item_queue.push_back(item); + } + { + let data_type = writer.scalar_type().to_ca_id()?; if data_type > 6 { - error!("data type of series unexpected {} {:?}", data_type, wr.scalar_type()); + error!( + "data type of series unexpected {} {:?}", + data_type, + writer.scalar_type() + ); } let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); @@ -920,20 +988,22 @@ impl CaConn { let data_type_asked = data_type + 14; debug!("send out EventAdd for {cid:?}"); let ty = CaMsgTy::EventAdd(EventAdd { - sid: st2.sid.0, + sid: st2.channel.sid.to_u32(), data_type: data_type_asked, - data_count: wr.shape().to_ca_count()? as _, - subid: subid.0, + data_count: writer.shape().to_ca_count()? as u16, + subid: subid.to_u32(), }); - let msg = CaMsg::from_ty_ts(ty, Instant::now()); + let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow); let proto = self.proto.as_mut().unwrap(); proto.push_out(msg); } - st2.state = MonitoringState::AddingEvent(wr.sid()); - *chst = ChannelState::Writable(WritableState { - created: st2.clone(), - writer: wr, - }); + let created_state = CreatedState22 { + tsbeg: self.poll_tsnow, + channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), + writer, + reading: ReadingState::EnableMonitoring(EnableMonitoringState { tsbeg: self.poll_tsnow }), + }; + *chst = ChannelState::Created(created_state); Ok(()) } else { warn!("TODO handle_series_lookup_result channel in bad state, reset"); @@ -1047,14 +1117,16 @@ impl CaConn { ChannelState::Init(cssid) => { *chst = ChannelState::Ended(cssid.clone()); } - ChannelState::Creating { cssid, .. } => { - *chst = ChannelState::Ended(cssid.clone()); + ChannelState::Creating(st2) => { + *chst = ChannelState::Ended(st2.cssid.clone()); } ChannelState::MakingSeriesWriter(st) => { - *chst = ChannelState::Ended(st.cssid.clone()); + *chst = ChannelState::Ended(st.channel.cssid.clone()); } - ChannelState::Writable(st2) => { - let cssid = st2.created.cssid.clone(); + ChannelState::Created(st2) => { + let cssid = st2.channel.cssid.clone(); + // TODO should call the proper channel-close handler which in turn emits the status item. + // Make sure I record the reason for the "Close": user command, IOC error, etc.. let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: self.tmp_ts_poll, cssid: cssid.clone(), @@ -1106,8 +1178,9 @@ impl CaConn { let mut not_alive_count = 0; for (_, st) in &self.channels { match st { - ChannelState::Writable(st) => { - if tsnow.duration_since(st.created.ts_alive_last) >= Duration::from_millis(10000) { + ChannelState::Created(st2) => { + if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) { + warn!("TODO assume channel not alive because nothing received, but should do CAGET"); not_alive_count += 1; } else { alive_count += 1; @@ -1126,29 +1199,25 @@ impl CaConn { let timenow = self.tmp_ts_poll; for (_, st) in &mut self.channels { match st { - ChannelState::Init(_cssid) => { + ChannelState::Init(..) => { // TODO need last-save-ts for this state. } - ChannelState::Creating { - cid: _, - ts_beg: _, - cssid: _, - } => { + ChannelState::Creating(..) => { // TODO need last-save-ts for this state. } ChannelState::MakingSeriesWriter(..) => { // TODO ? } - ChannelState::Writable(st) => { - let created = &mut st.created; + ChannelState::Created(st) => { + let crst = &mut st.channel; // TODO if we don't wave a series id yet, dont' save? write-ampl. let msp = info_store_msp_from_time(timenow.clone()); - if msp != created.info_store_msp_last { - created.info_store_msp_last = msp; + if msp != crst.info_store_msp_last { + crst.info_store_msp_last = msp; let item = QueryItem::ChannelInfo(ChannelInfoItem { ts_msp: msp, series: st.writer.sid(), - ivl: created.item_recv_ivl_ema.ema().ema(), + ivl: crst.item_recv_ivl_ema.ema().ema(), interest: 0., evsize: 0, }); @@ -1190,95 +1259,35 @@ impl CaConn { }; // debug!("handle_event_add_res {ev:?}"); match ch_s { - ChannelState::Writable(st) => { - let created = &mut st.created; - created.ts_alive_last = tsnow; - created.item_recv_ivl_ema.tick(tsnow); - let series = match &mut created.state { - MonitoringState::AddingEvent(series) => { - let series = series.clone(); - created.state = MonitoringState::Evented( - series.clone(), - EventedState { - ts_last: tsnow, - recv_count: 0, - recv_bytes: 0, - }, - ); - series - } - MonitoringState::Evented(series, st) => { - st.ts_last = tsnow; - series.clone() - } - _ => { - let e = Error::from_string(format!("unexpected state: EventAddRes while having {:?}", created)); - error!("{e}"); - return Err(e); - } - }; - // TODO should attach these counters already to Writable state. - if let MonitoringState::Evented(_, st2) = &mut created.state { - st2.recv_count += 1; - st2.recv_bytes += ev.payload_len as u64; + ChannelState::Created(st) => match &mut st.reading { + ReadingState::EnableMonitoring(st2) => { + let dt = st2.tsbeg.elapsed().as_secs_f32(); + debug!("change to Monitoring after dt {dt:.0} ms"); + st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow }); + Self::event_add_ingest( + ev, + &mut st.channel, + &mut st.writer, + &mut self.insert_item_queue, + tsnow, + self.tmp_ts_poll, + self.stats.as_ref(), + )?; } - let ts_local = { - let ts = self.tmp_ts_poll; - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 - }; - let ts = ev.value.ts; - let ts_diff = ts.abs_diff(ts_local); - self.stats.ca_ts_off().ingest((ts_diff / MS) as u32); - if tsnow >= created.insert_next_earliest { - { - created.muted_before = 0; - created.insert_item_ivl_ema.tick(tsnow); - let em = created.insert_item_ivl_ema.ema(); - let ema = em.ema(); - let ivl_min = (self.insert_ivl_min_mus as f32) * 1e-6; - let dt = (ivl_min - ema).max(0.) / em.k(); - created.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); - } - Self::check_ev_value_data(&ev.value.data, st.writer.scalar_type())?; - { - let val: DataValue = ev.value.data.into(); - st.writer - .write( - TsNano::from_ns(ts), - TsNano::from_ns(ts_local), - val, - &mut self.insert_item_queue, - ) - .map_err(|e| Error::from_string(e))?; - self.inserts_counter += 1; - } - } else { - self.stats.channel_fast_item_drop.inc(); - if tsnow.duration_since(created.insert_recv_ivl_last) >= Duration::from_millis(10000) { - created.insert_recv_ivl_last = tsnow; - let ema = created.insert_item_ivl_ema.ema(); - let item = IvlItem { - series: series.clone(), - ts, - ema: ema.ema(), - emd: ema.emv().sqrt(), - }; - self.insert_item_queue.push_back(QueryItem::Ivl(item)); - } - if false && created.muted_before == 0 { - let ema = created.insert_item_ivl_ema.ema(); - let item = MuteItem { - series: series.clone(), - ts, - ema: ema.ema(), - emd: ema.emv().sqrt(), - }; - self.insert_item_queue.push_back(QueryItem::Mute(item)); - } - created.muted_before = 1; + ReadingState::Monitoring(_st2) => { + let crst = &mut st.channel; + let writer = &mut st.writer; + let iiq = &mut self.insert_item_queue; + let stats = self.stats.as_ref(); + Self::event_add_ingest(ev, crst, writer, iiq, tsnow, self.tmp_ts_poll, stats)?; } - } + ReadingState::StopMonitoringForPolling(..) => { + error!("TODO handle_event_add_res handle StopMonitoringForPolling"); + } + ReadingState::Polling(..) => { + error!("TODO handle_event_add_res handle Polling"); + } + }, _ => { // TODO count instead of print error!("unexpected state: EventAddRes while having {ch_s:?}"); @@ -1287,6 +1296,74 @@ impl CaConn { Ok(()) } + fn event_add_ingest( + ev: proto::EventAddRes, + crst: &mut CreatedState, + writer: &mut SeriesWriter, + iiq: &mut VecDeque, + tsnow: Instant, + stnow: SystemTime, + stats: &CaConnStats, + ) -> Result<(), Error> { + crst.ts_alive_last = tsnow; + crst.item_recv_ivl_ema.tick(tsnow); + crst.recv_count += 1; + crst.recv_bytes += ev.payload_len as u64; + let series = writer.sid(); + // TODO should attach these counters already to Writable state. + let ts_local = { + let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap(); + epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 + }; + let ts = ev.value.ts; + let ts_diff = ts.abs_diff(ts_local); + stats.ca_ts_off().ingest((ts_diff / MS) as u32); + if tsnow >= crst.insert_next_earliest { + { + crst.muted_before = 0; + crst.insert_item_ivl_ema.tick(tsnow); + let em = crst.insert_item_ivl_ema.ema(); + let ema = em.ema(); + let ivl_min = (crst.insert_ivl_min_mus as f32) * 1e-6; + let dt = (ivl_min - ema).max(0.) / em.k(); + crst.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); + } + Self::check_ev_value_data(&ev.value.data, writer.scalar_type())?; + { + let val: DataValue = ev.value.data.into(); + writer + .write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq) + .map_err(|e| Error::from_string(e))?; + } + Ok(()) + } else { + stats.channel_fast_item_drop.inc(); + if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { + crst.insert_recv_ivl_last = tsnow; + let ema = crst.insert_item_ivl_ema.ema(); + let item = IvlItem { + series: series.clone(), + ts, + ema: ema.ema(), + emd: ema.emv().sqrt(), + }; + iiq.push_back(QueryItem::Ivl(item)); + } + if false && crst.muted_before == 0 { + let ema = crst.insert_item_ivl_ema.ema(); + let item = MuteItem { + series: series.clone(), + ts, + ema: ema.ema(), + emd: ema.emv().sqrt(), + }; + iiq.push_back(QueryItem::Mute(item)); + } + crst.muted_before = 1; + Ok(()) + } + } + fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> { use crate::ca::proto::CaDataScalarValue; use crate::ca::proto::CaDataValue; @@ -1372,14 +1449,14 @@ impl CaConn { } } - fn check_channels_state_init(&mut self, do_wake_again: &mut bool) -> Result<(), Error> { + fn check_channels_state_init(&mut self, tsnow: Instant, do_wake_again: &mut bool) -> Result<(), Error> { // TODO profile, efficient enough? if self.init_state_count == 0 { return Ok(()); } let keys: Vec = self.channels.keys().map(|x| *x).collect(); for cid in keys { - match self.channels.get_mut(&cid).unwrap() { + match self.channels.get(&cid).unwrap() { ChannelState::Init(cssid) => { let cssid = cssid.clone(); let name = self @@ -1394,17 +1471,17 @@ impl CaConn { cid: cid.0, channel: name.into(), }), - Instant::now(), + tsnow, ); *do_wake_again = true; self.proto.as_mut().unwrap().push_out(msg); // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); - *ch_s = ChannelState::Creating { + *ch_s = ChannelState::Creating(CreatingState { + tsbeg: tsnow, cssid, cid, - ts_beg: Instant::now(), - }; + }); self.init_state_count -= 1; } _ => {} @@ -1420,7 +1497,7 @@ impl CaConn { let mut ts1 = Instant::now(); // TODO unify with Listen state where protocol gets polled as well. let mut do_wake_again = false; - self.check_channels_state_init(&mut do_wake_again)?; + self.check_channels_state_init(ts1, &mut do_wake_again)?; let ts2 = Instant::now(); self.stats .time_check_channels_state_init @@ -1543,10 +1620,23 @@ impl CaConn { } fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> { - // TODO handle cid-not-found which can also indicate peer error. let cid = Cid(k.cid); let sid = Sid(k.sid); - let name = if let Some(x) = self.name_by_cid(cid) { + let channels = &mut self.channels; + let name_by_cid = &self.name_by_cid; + // TODO handle not-found error: + let ch_s = channels.get_mut(&cid).unwrap(); + let cssid = match ch_s { + ChannelState::Creating(st) => st.cssid.clone(), + _ => { + // TODO handle in better way: + // Remove channel and emit notice that channel is removed with reason. + let e = Error::with_msg_no_trace("handle_peer_ready bad state"); + return Err(e); + } + }; + // TODO handle cid-not-found which can also indicate peer error. + let name = if let Some(x) = name_by_cid.get(&cid) { x.to_string() } else { return Err(Error::with_msg_no_trace(format!("no name for {cid:?}"))); @@ -1557,24 +1647,12 @@ impl CaConn { } let scalar_type = ScalarType::from_ca_id(k.data_type)?; let shape = Shape::from_ca_count(k.data_count)?; - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - let cssid = match ch_s { - ChannelState::Creating { cssid, .. } => cssid.clone(), - _ => { - // TODO handle in better way: - // Remove channel and emit notice that channel is removed with reason. - let e = Error::with_msg_no_trace("handle_peer_ready bad state"); - return Err(e); - } - }; - let created_state = CreatedState { + let channel = CreatedState { cssid, cid, sid, ts_created: tsnow, ts_alive_last: tsnow, - state: MonitoringState::FetchSeriesId, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, @@ -1583,9 +1661,12 @@ impl CaConn { insert_recv_ivl_last: tsnow, insert_next_earliest: tsnow, muted_before: 0, + insert_ivl_min_mus: self.insert_ivl_min_mus, info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll), + recv_count: 0, + recv_bytes: 0, }; - *ch_s = ChannelState::MakingSeriesWriter(created_state); + *ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); let name = self .name_by_cid(cid) .ok_or_else(|| Error::from_string(format!("no name for cid {cid:?}")))?; @@ -1865,7 +1946,7 @@ impl CaConn { fn tick_writers(&mut self) -> Result<(), Error> { for (k, st) in &mut self.channels { - if let ChannelState::Writable(st2) = st { + if let ChannelState::Created(st2) = st { st2.writer .tick(&mut self.insert_item_queue) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; @@ -1888,110 +1969,6 @@ impl CaConn { self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle() } - fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { - use Poll::*; - let (qu, sd, stats) = Self::storage_queue_vars(&mut self); - { - // TODO use stats histogram type to test the native prometheus histogram feature - let n = qu.len(); - if n >= 128 { - stats.storage_queue_above_128().inc(); - } else if n >= 32 { - stats.storage_queue_above_32().inc(); - } else if n >= 8 { - stats.storage_queue_above_8().inc(); - } - } - let mut have_progress = false; - let mut i = 0; - loop { - i += 1; - if i > 120 { - break; - } - if !sd.has_sender() { - return Err(Error::with_msg_no_trace("attempt_flush_storage_queue no more sender")); - } - if sd.is_idle() { - if qu.len() != 0 { - let item: VecDeque<_> = qu.drain(..).collect(); - stats.storage_queue_send().add(item.len() as _); - sd.as_mut().send_pin(item); - } else { - break; - } - } - if sd.is_sending() { - match sd.poll_unpin(cx) { - Ready(Ok(())) => { - have_progress = true; - } - Ready(Err(_)) => { - return Err(Error::with_msg_no_trace( - "attempt_flush_storage_queue can not send into channel", - )); - } - Pending => { - stats.storage_queue_pending().inc(); - return Ok(Pending); - } - } - } - } - if have_progress { - Ok(Ready(Some(()))) - } else { - Ok(Ready(None)) - } - } - - // TODO refactor, put together in separate type: - fn storage_queue_vars( - this: &mut CaConn, - ) -> ( - &mut VecDeque, - &mut Pin>>>, - &CaConnStats, - ) { - ( - &mut this.insert_item_queue, - &mut this.storage_insert_sender, - &this.stats, - ) - } - - fn attempt_flush_writer_establish(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { - use Poll::*; - let sd = self.writer_establish_tx.as_mut(); - if !sd.has_sender() { - return Err(Error::with_msg_no_trace( - "attempt_flush_channel_info_query no more sender", - )); - } - if sd.is_idle() { - if let Some(item) = self.writer_establish_qu.pop_front() { - trace3!("send EstablishWorkerJob"); - let sd = self.writer_establish_tx.as_mut(); - sd.send_pin(item); - } - } - let sd = &mut self.writer_establish_tx; - if sd.is_sending() { - match sd.poll_unpin(cx) { - Ready(Ok(())) => { - debug!("flushed writer establish job"); - Ok(Ready(Some(()))) - } - Ready(Err(_)) => Err(Error::with_msg_no_trace( - "attempt_flush_channel_info_query can not send into channel", - )), - Pending => Ok(Pending), - } - } else { - Ok(Ready(None)) - } - } - fn attempt_flush_queue( qu: &mut VecDeque, sp: &mut Pin>>, @@ -2046,19 +2023,38 @@ impl CaConn { Ok(Ready(None)) } } +} - fn send_individual(qu: &mut VecDeque) -> Option { - qu.pop_front() - } - - fn send_batched(qu: &mut VecDeque) -> Option> { - let n = qu.len(); - if n == 0 { - None - } else { - let batch = qu.drain(..n.min(N)).collect(); - Some(batch) +// $have is tuple (have_progress, have_pending)) +macro_rules! flush_queue { + ($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr) => { + let obj = $self.as_mut().get_mut(); + let qu = &mut obj.$qu; + let sp = &mut obj.$sp; + match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id) { + Ok(Ready(Some(()))) => { + *$have.0 |= true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + *$have.1 |= true; + } + Err(e) => break Ready(Some(Err(e))), } + }; +} + +fn send_individual(qu: &mut VecDeque) -> Option { + qu.pop_front() +} + +fn send_batched(qu: &mut VecDeque) -> Option> { + let n = qu.len(); + if n == 0 { + None + } else { + let batch = qu.drain(..n.min(N)).collect(); + Some(batch) } } @@ -2067,6 +2063,7 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + self.poll_tsnow = Instant::now(); self.tmp_ts_poll = SystemTime::now(); let poll_ts1 = Instant::now(); self.stats.poll_count().inc(); @@ -2103,41 +2100,51 @@ impl Stream for CaConn { Err(e) => break Ready(Some(Err(e))), } - if !self.is_shutdown() { - let obj = self.as_mut().get_mut(); - let qu = &mut obj.insert_item_queue; - let sp = &mut obj.storage_insert_sender; - match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx, "strg") { - Ok(Ready(Some(()))) => { - have_progress = true; - } - Ok(Ready(None)) => {} - Ok(Pending) => { - have_pending = true; - } - Err(e) => break Ready(Some(Err(e))), + { + // TODO use stats histogram type to test the native prometheus histogram feature + let qu = &self.insert_item_queue; + let stats = &self.stats; + let n = qu.len(); + if n >= 128 { + stats.storage_queue_above_128().inc(); + } else if n >= 32 { + stats.storage_queue_above_32().inc(); + } else if n >= 8 { + stats.storage_queue_above_8().inc(); } } - let lts3 = Instant::now(); + let lts2; + let lts3; if !self.is_shutdown() { - let obj = self.as_mut().get_mut(); - let qu = &mut obj.writer_establish_qu; - let sp = &mut obj.writer_establish_tx; - match Self::attempt_flush_queue(qu, sp, Self::send_individual, 32, cx, "wr-est") { - Ok(Ready(Some(()))) => { - have_progress = true; - } - Ok(Ready(None)) => {} - Ok(Pending) => { - have_pending = true; - } - Err(e) => break Ready(Some(Err(e))), - } - } + flush_queue!( + self, + insert_item_queue, + storage_insert_sender, + send_batched::<48, _>, + 32, + (&mut have_progress, &mut have_pending), + "strg", + cx + ); + lts2 = Instant::now(); - let lts2 = Instant::now(); + flush_queue!( + self, + writer_establish_qu, + writer_establish_tx, + send_individual, + 32, + (&mut have_progress, &mut have_pending), + "wrest", + cx + ); + lts3 = Instant::now(); + } else { + lts2 = Instant::now(); + lts3 = Instant::now(); + } match self.as_mut().handle_writer_establish_result(cx) { Ok(Ready(Some(()))) => {