diff --git a/.github/workflows/build-rhel7.yml b/.github/workflows/build-rhel7.yml index 3ac5506..00fdff5 100644 --- a/.github/workflows/build-rhel7.yml +++ b/.github/workflows/build-rhel7.yml @@ -100,17 +100,21 @@ jobs: - run: "echo version: TT$(cat daqver)TT" - run: "echo 'version: [${{steps.daqingest_version_set.outputs.daqingest_version}}]'" - run: "echo 'version: [${{env.DAQVER}}]'" - - run: "mkdir daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7" - - run: "cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7/daqingest" - - run: "tar -czf daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7" + - run: echo "SELFPKG=daqingest-$DAQVER" >> $GITHUB_ENV + - run: echo "SELFPKGTGT=$SELFPKG-amd64-rhel7" >> $GITHUB_ENV + - run: echo SELFPKG $SELFPKG + - run: echo SELFPKGTGT $SELFPKGTGT + - run: mkdir $SELFPKGTGT + - run: cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest $SELFPKGTGT/daqingest + - run: tar -czf $SELFPKGTGT.tar.gz $SELFPKGTGT - uses: actions/upload-artifact@v3 with: - name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}} + name: ${{env.SELFPKGTGT}} path: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest - - run: echo "{\"tag_name\":\"buildaction\", \"name\":\"daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}\", \"draft\":true, \"prerelease\":true}" > create-rel.json + - run: echo "{\"tag_name\":\"$DAQVER\", \"name\":\"$SELFPKG\", \"draft\":true, \"prerelease\":true}" > create-rel.json - run: "curl -v -o rel.json -L -X POST -H content-type:application/json -H 'accept:application/vnd.github+json' -H 'authorization:bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T create-rel.json https://api.github.com/repos/paulscherrerinstitute/daqingest/releases" - run: cat rel.json - - run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7" + - run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=$SELFPKGTGT" if: false - - run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz" + - run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T $SELFPKGTGT.tar.gz https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=$SELFPKGTGT.tar.gz" - run: cat relass.json diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 05b017e..da6e0d1 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -13,11 +13,11 @@ bsread = [] clap = { version = "4.4.2", features = ["derive", "cargo"] } tracing = "0.1" serde = { version = "1.0", features = ["derive"] } -tokio-postgres = "0.7.9" +tokio-postgres = "0.7.10" async-channel = "1.9.0" futures-util = "0.3" chrono = "0.4" -bytes = "1.4.0" +bytes = "1.5.0" libc = "0.2" err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 730ecbc..30ee0c8 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -564,7 +564,16 @@ impl Daemon { } async fn handle_shutdown(&mut self) -> Result<(), Error> { - todo!("handle_shutdown"); + error!("TODO handle_shutdown"); + // TODO make sure we: + // set a flag so that we don't attempt to use resources any longer (why could that happen?) + // does anybody might still want to communicate with us? can't be excluded. + // send shutdown signal to everyone. + // drop our ends of channels to workers (gate them behind option?). + // await the connection sets. + // await other workers that we've spawned. + self.connset_ctrl.shutdown().await?; + Ok(()) } #[cfg(DISABLED)] diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index e6b04ca..667a2f7 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -12,7 +12,7 @@ serde_yaml = "0.9.16" tokio-stream = { version = "0.1", features = ["fs"] } tracing = "0.1.37" async-channel = "1.9.0" -bytes = "1.4" +bytes = "1.5" arrayref = "0.3" byteorder = "1.4" futures-util = "0.3" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index cdff375..eaa2ac5 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -159,6 +159,8 @@ struct CreatedState { cid: Cid, #[allow(unused)] sid: u32, + data_type: u16, + data_count: u16, scalar_type: ScalarType, shape: Shape, #[allow(unused)] @@ -468,11 +470,8 @@ pub struct CaConn { ioc_ping_start: Option, cmd_res_queue: VecDeque, ca_conn_event_out_queue: VecDeque, - channel_info_query_tx: Sender, - series_lookup_schedule: BTreeMap, - series_lookup_futs: FuturesUnordered< - Pin), Error>> + Send>>, - >, + channel_info_query_queue: VecDeque, + channel_info_query_sending: SenderPolling, time_binners: BTreeMap, ts_earliest_warn_poll_slow: Instant, } @@ -521,9 +520,8 @@ impl CaConn { ioc_ping_start: None, cmd_res_queue: VecDeque::new(), ca_conn_event_out_queue: VecDeque::new(), - channel_info_query_tx, - series_lookup_schedule: BTreeMap::new(), - series_lookup_futs: FuturesUnordered::new(), + channel_info_query_queue: VecDeque::new(), + channel_info_query_sending: SenderPolling::new(channel_info_query_tx), time_binners: BTreeMap::new(), ts_earliest_warn_poll_slow: Instant::now(), } @@ -639,6 +637,49 @@ impl CaConn { // TODO return the result } + fn handle_series_lookup_result( + &mut self, + res: Result, + ) -> Result<(), Error> { + match res { + Ok(res) => { + let series = res.series.into_inner(); + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + series: series.clone(), + status: ChannelStatus::Opened, + }); + self.insert_item_queue.push_back(item); + if let Some(cid) = self.cid_by_name.get(&res.channel) { + if let Some(chst) = self.channels.get(cid) { + if let ChannelState::FetchingSeriesId(st2) = chst { + let cid = st2.cid.clone(); + let sid = st2.sid; + let data_type = st2.data_type; + let data_count = st2.data_count; + match self.channel_to_evented(cid, sid, data_type, data_count, series) { + Ok(_) => {} + Err(e) => { + error!("channel_to_evented {e}"); + } + } + } else { + warn!("TODO channel in bad state, reset"); + } + } else { + warn!("TODO channel in bad state, reset"); + } + } else { + warn!("TODO channel in bad state, reset"); + } + } + Err(e) => { + error!("handle_series_lookup_result got error {e}"); + } + } + Ok(()) + } + fn handle_conn_command(&mut self, cx: &mut Context) -> Poll>> { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; @@ -664,7 +705,10 @@ impl CaConn { self.cmd_shutdown(); Ready(Some(Ok(()))) } - ConnCommandKind::SeriesLookupResult(_) => todo!("TODO handle SeriesLookupResult"), + ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { + Ok(()) => Ready(Some(Ok(()))), + Err(e) => Ready(Some(Err(e))), + }, } } Ready(None) => { @@ -912,15 +956,10 @@ impl CaConn { sid: u32, data_type: u16, data_count: u16, - series: Existence, - cx: &mut Context, + series: SeriesId, ) -> Result<(), Error> { let tsnow = Instant::now(); self.stats.get_series_id_ok_inc(); - let series = match series { - Existence::Created(k) => k, - Existence::Existing(k) => k, - }; if series.id() == 0 { warn!("Weird series id: {series:?}"); } @@ -962,6 +1001,8 @@ impl CaConn { cssid, cid, sid, + data_type, + data_count, scalar_type, shape, ts_created: tsnow, @@ -980,72 +1021,10 @@ impl CaConn { *ch_s = ChannelState::Created(series, created_state); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; - cx.waker().wake_by_ref(); + error!("TODO channel_to_evented make sure we get polled again?"); Ok(()) } - fn emit_series_lookup(&mut self, cx: &mut Context) { - let _ = cx; - loop { - break if let Some(mut entry) = self.series_lookup_schedule.first_entry() { - todo!("emit_series_lookup"); - #[cfg(DISABLED)] - { - let dummy = entry.get().dummy(); - let query = std::mem::replace(entry.get_mut(), dummy); - match self.channel_info_query_tx.try_send(query) { - Ok(()) => { - entry.remove(); - continue; - } - Err(e) => match e { - async_channel::TrySendError::Full(_) => { - warn!("series lookup channel full"); - *entry.get_mut() = e.into_inner(); - } - async_channel::TrySendError::Closed(_) => { - warn!("series lookup channel closed"); - // *entry.get_mut() = e.into_inner(); - entry.remove(); - } - }, - } - } - } else { - () - }; - } - } - - fn poll_channel_info_results(&mut self, cx: &mut Context) { - use Poll::*; - loop { - break match self.series_lookup_futs.poll_next_unpin(cx) { - Ready(Some(Ok((cid, sid, data_type, data_count, series)))) => { - { - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), - series: series.clone().into_inner().into(), - status: ChannelStatus::Opened, - }); - self.insert_item_queue.push_back(item); - } - match self.channel_to_evented(cid, sid, data_type, data_count, series, cx) { - Ok(_) => {} - Err(e) => { - error!("poll_channel_info_results {e}"); - } - } - } - Ready(Some(Err(e))) => { - error!("poll_channel_info_results {e}"); - } - Ready(None) => {} - Pending => {} - }; - } - } - fn event_add_insert( st: &mut CreatedState, series: SeriesId, @@ -1451,6 +1430,8 @@ impl CaConn { cssid, cid, sid, + data_type: k.data_type, + data_count: k.data_count, scalar_type: scalar_type.clone(), shape: shape.clone(), ts_created: tsnow, @@ -1468,37 +1449,17 @@ impl CaConn { }; *ch_s = ChannelState::FetchingSeriesId(created_state); // TODO handle error in different way. Should most likely not abort. - if !self.series_lookup_schedule.contains_key(&cid) { - let tx = SendSeriesLookup { - tx: self.conn_command_tx.clone(), - }; - let query = ChannelInfoQuery { - backend: self.backend.clone(), - channel: name.clone(), - scalar_type: scalar_type.to_scylla_i32(), - shape_dims: shape.to_scylla_vec(), - tx: Box::pin(tx), - }; - self.series_lookup_schedule.insert(cid, query); - todo!("TODO discover the series lookup from main command queue"); - // let fut = async move { - // match rx.recv().await { - // Ok(item) => match item { - // Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)), - // Err(e) => Err(Error::with_msg_no_trace(e.to_string())), - // }, - // Err(e) => { - // // TODO count only - // error!("can not receive series lookup result for {name} {e}"); - // Err(Error::with_msg_no_trace("can not receive lookup result")) - // } - // } - // }; - // self.series_lookup_futs.push(Box::pin(fut)); - } else { - // TODO count only - warn!("series lookup for {name} already in progress"); - } + let tx = SendSeriesLookup { + tx: self.conn_command_tx.clone(), + }; + let query = ChannelInfoQuery { + backend: self.backend.clone(), + channel: name.clone(), + scalar_type: scalar_type.to_scylla_i32(), + shape_dims: shape.to_scylla_vec(), + tx: Box::pin(tx), + }; + self.channel_info_query_queue.push_back(query); do_wake_again = true; } CaMsgTy::EventAddRes(k) => { @@ -1739,8 +1700,6 @@ impl CaConn { } fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { - self.emit_series_lookup(cx); - self.poll_channel_info_results(cx); let this = self.get_mut(); for (_, tb) in this.time_binners.iter_mut() { let iiq = &mut this.insert_item_queue; @@ -1793,6 +1752,22 @@ impl Stream for CaConn { }; Ready(Some(Ok(ev))) } else { + let _ = loop { + let sd = &mut self.channel_info_query_sending; + break if sd.is_sending() { + match sd.poll_unpin(cx) { + Ready(Ok(())) => continue, + Ready(Err(e)) => Ready(Some(e)), + Pending => Pending, + } + } else if let Some(item) = self.channel_info_query_queue.pop_front() { + let sd = &mut self.channel_info_query_sending; + sd.send2(item); + continue; + } else { + Ready(None) + }; + }; let ret = loop { self.stats.caconn_loop1_count_inc(); loop { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 4be71b9..74f8a46 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -244,38 +244,7 @@ impl CaConnSet { ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await, ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await, ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await, - ConnSetCmd::IocAddrQueryResult(res) => { - for e in res { - if let Some(addr) = e.addr { - debug!("ioc found {e:?}"); - let ch = Channel::new(e.channel.clone()); - if let Some(chst) = self.channel_states.inner().get(&ch) { - if let ChannelStateValue::Active(ast) = &chst.value { - if let ActiveChannelState::WithStatusSeriesId { - status_series_id, - state, - } = ast - { - let add = ChannelAddWithAddr { - backend: self.backend.clone(), - name: e.channel, - addr: SocketAddr::V4(addr), - cssid: status_series_id.clone(), - local_epics_hostname: self.local_epics_hostname.clone(), - }; - } else { - warn!("TODO got address but no longer active"); - } - } else { - warn!("TODO got address but no longer active"); - } - } else { - warn!("ioc addr lookup done but channel no longer here"); - } - } - } - Ok(()) - } + ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, ConnSetCmd::CheckHealth => { error!("TODO implement check health"); Ok(()) @@ -354,7 +323,7 @@ impl CaConnSet { *chst2 = ActiveChannelState::WithStatusSeriesId { status_series_id: add.cssid, state: WithStatusSeriesIdState { - inner: WithStatusSeriesIdStateInner::NoAddress { + inner: WithStatusSeriesIdStateInner::SearchPending { since: SystemTime::now(), }, }, @@ -384,6 +353,51 @@ impl CaConnSet { Ok(()) } + async fn handle_ioc_query_result(&mut self, res: VecDeque) -> Result<(), Error> { + for e in res { + let ch = Channel::new(e.channel.clone()); + if let Some(chst) = self.channel_states.inner().get_mut(&ch) { + if let ChannelStateValue::Active(ast) = &mut chst.value { + if let ActiveChannelState::WithStatusSeriesId { + status_series_id, + state, + } = ast + { + if let Some(addr) = e.addr { + debug!("ioc found {e:?}"); + let add = ChannelAddWithAddr { + backend: self.backend.clone(), + name: e.channel, + addr: SocketAddr::V4(addr), + cssid: status_series_id.clone(), + local_epics_hostname: self.local_epics_hostname.clone(), + }; + self.connset_tx + .send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithAddr(add))) + .await?; + let since = SystemTime::now(); + state.inner = WithStatusSeriesIdStateInner::WithAddress { + addr, + state: WithAddressState::Unassigned { since }, + } + } else { + debug!("ioc not found {e:?}"); + let since = SystemTime::now(); + state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since }; + } + } else { + warn!("TODO got address but no longer active"); + } + } else { + warn!("TODO got address but no longer active"); + } + } else { + warn!("ioc addr lookup done but channel no longer here"); + } + } + Ok(()) + } + fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result { // TODO should we save this as event? let opts = CaConnOpts::default(); @@ -629,15 +643,12 @@ impl CaConnSet { //info!("UnknownAddress {} {:?}", i, ch); if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX { search_pending_count += 1; - state.inner = WithStatusSeriesIdStateInner::SearchPending { - since: tsnow, - did_send: false, - }; + state.inner = WithStatusSeriesIdStateInner::SearchPending { since: tsnow }; SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel); } } } - WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => { + WithStatusSeriesIdStateInner::SearchPending { since } => { //info!("SearchPending {} {:?}", i, ch); let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > SEARCH_PENDING_TIMEOUT { @@ -650,7 +661,7 @@ impl CaConnSet { //info!("WithAddress {} {:?}", i, ch); use WithAddressState::*; match state { - Unassigned { assign_at } => { + Unassigned { since } => { // TODO do I need this case anymore? #[cfg(DISABLED)] if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow { @@ -716,59 +727,51 @@ impl CaConnSet { } fn update_channel_state_counts(&mut self) -> (u64,) { - let mut unknown_address_count = 0; - let mut search_pending_count = 0; - let mut search_pending_did_send_count = 0; - let mut unassigned_count = 0; - let mut assigned_count = 0; - let mut no_address_count = 0; + let mut unknown_address = 0; + let mut search_pending = 0; + let mut unassigned = 0; + let mut assigned = 0; + let mut no_address = 0; for (_ch, st) in self.channel_states.inner().iter() { match &st.value { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::Init { .. } => { - unknown_address_count += 1; + unknown_address += 1; } ActiveChannelState::WaitForStatusSeriesId { .. } => { - unknown_address_count += 1; + unknown_address += 1; } ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { WithStatusSeriesIdStateInner::UnknownAddress { .. } => { - unknown_address_count += 1; + unknown_address += 1; } - WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => { - if *did_send { - search_pending_did_send_count += 1; - } else { - search_pending_count += 1; - } + WithStatusSeriesIdStateInner::SearchPending { .. } => { + search_pending += 1; } WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { WithAddressState::Unassigned { .. } => { - unassigned_count += 1; + unassigned += 1; } WithAddressState::Assigned(_) => { - assigned_count += 1; + assigned += 1; } }, WithStatusSeriesIdStateInner::NoAddress { .. } => { - no_address_count += 1; + no_address += 1; } }, }, ChannelStateValue::ToRemove { .. } => { - unknown_address_count += 1; + unknown_address += 1; } } } use atomic::Ordering::Release; - self.stats.channel_unknown_address.store(unknown_address_count, Release); - self.stats.channel_search_pending.store(search_pending_count, Release); - self.stats - .search_pending_did_send - .store(search_pending_did_send_count, Release); - self.stats.unassigned.store(unassigned_count, Release); - self.stats.assigned.store(assigned_count, Release); - self.stats.channel_no_address.store(no_address_count, Release); - (search_pending_count,) + self.stats.channel_unknown_address.store(unknown_address, Release); + self.stats.channel_search_pending.store(search_pending, Release); + self.stats.channel_no_address.store(no_address, Release); + self.stats.channel_unassigned.store(unassigned, Release); + self.stats.channel_assigned.store(assigned, Release); + (search_pending,) } } diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 26cfc97..822ff06 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -53,9 +53,6 @@ macro_rules! trace_batch { }); } -#[derive(Debug)] -pub struct IocAddrQueryResult {} - fn transform_pgres(rows: Vec) -> VecDeque { let mut ret = VecDeque::new(); for row in rows { diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index f1aca41..0ef2339 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -53,7 +53,7 @@ pub struct ConnectionState { pub enum WithAddressState { Unassigned { //#[serde(with = "serde_Instant")] - assign_at: SystemTime, + since: SystemTime, }, Assigned(ConnectionState), } @@ -66,7 +66,6 @@ pub enum WithStatusSeriesIdStateInner { SearchPending { //#[serde(with = "serde_Instant")] since: SystemTime, - did_send: bool, }, WithAddress { addr: SocketAddrV4, diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 5253efe..b19d02d 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -212,12 +212,11 @@ stats_proc::stats_struct!(( name(CaConnSetStats), counters( channel_unknown_address, - channel_with_address, channel_search_pending, - search_pending_did_send, channel_no_address, - unassigned, - assigned, + channel_with_address, + channel_unassigned, + channel_assigned, ), ), // agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)), @@ -272,6 +271,7 @@ stats_proc::stats_struct!(( channel_all_count, channel_alive_count, channel_not_alive_count, + channel_series_lookup_already_pending, ca_ts_off_1, ca_ts_off_2, ca_ts_off_3,