From ba9bb7e26c9edf7e31c3aaf42bce867052f7b87a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 6 Sep 2023 17:34:27 +0200 Subject: [PATCH] WIP refactor --- .github/workflows/build-rhel7.yml | 15 +- daqingest/Cargo.toml | 1 - daqingest/src/daemon.rs | 514 +++--------------------------- daqingest/src/daemon/finder.rs | 339 -------------------- dbpg/src/seriesbychannel.rs | 28 +- ingest-bsread/src/bsreadclient.rs | 4 + netfetch/Cargo.toml | 1 + netfetch/src/ca.rs | 2 + netfetch/src/ca/conn.rs | 96 ++++-- netfetch/src/ca/connset.rs | 424 ++++++++++++++++++++++-- netfetch/src/ca/finder.rs | 355 +++++++++++++++++++++ netfetch/src/ca/statemap.rs | 123 +++++++ stats/src/stats.rs | 17 + 13 files changed, 1028 insertions(+), 891 deletions(-) create mode 100644 netfetch/src/ca/finder.rs create mode 100644 netfetch/src/ca/statemap.rs diff --git a/.github/workflows/build-rhel7.yml b/.github/workflows/build-rhel7.yml index a350caa..23d44fc 100644 --- a/.github/workflows/build-rhel7.yml +++ b/.github/workflows/build-rhel7.yml @@ -99,19 +99,16 @@ jobs: # id: daqingest_version_set # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - run: "echo 'version: [${{steps.daqingest_version_set.outputs.daqingest_version}}]'" - - run: "mkdir daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}" - - run: "cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}/daqingest" - - run: "tar -czf daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}.tar.gz daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}" + - run: "mkdir daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7" + - run: "cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7/daqingest" + - run: "tar -czf daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7" - uses: actions/upload-artifact@v3 with: name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}} path: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest - - uses: actions/upload-artifact@v3 - with: - name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}.tar.gz - path: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}.tar.gz - run: echo "{\"tag_name\":\"buildaction\", \"name\":\"daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}\", \"draft\":true, \"prerelease\":true}" > create-rel.json - - run: "curl -v -o rel.json -L -X POST -H content-type:application/json -H 'accept:application/vnd.github+json' -H 'authorization:bearer ${{secrets.github_token}}' -H 'x-github-api-version: 2022-11-28' -T create-rel.json https://api.github.com/repos/paulscherrerinstitute/daqingest/releases" + - run: "curl -v -o rel.json -L -X POST -H content-type:application/json -H 'accept:application/vnd.github+json' -H 'authorization:bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T create-rel.json https://api.github.com/repos/paulscherrerinstitute/daqingest/releases" - run: cat rel.json - - run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H 'X-GitHub-Api-Version: 2022-11-28' -T ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7" + - run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7" + - run: "RELID=$(python -c 'import json; x=json.load(open(\"rel.json\")); print(x[\"id\"])') && curl -v -o relass.json -L -X POST -H content-type:application/octet-stream -H 'accept:application/vnd.github+json' -H 'authorization:Bearer ${{secrets.github_token}}' -H x-github-api-version:2022-11-28 -T daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz https://uploads.github.com/repos/paulscherrerinstitute/daqingest/releases/$RELID/assets?name=daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}}-amd64-rhel7.tar.gz" - run: cat relass.json diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 8fbcf94..05b017e 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -28,6 +28,5 @@ scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } series = { path = "../series" } netfetch = { path = "../netfetch" } -batchtools = { path = "../batchtools" } ingest-bsread = { path = "../ingest-bsread" } ingest-linux = { path = "../ingest-linux" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index c14fa8c..730ecbc 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -51,126 +51,12 @@ use tokio::task::JoinHandle; use tracing::info_span; use tracing::Instrument; -const SEARCH_BATCH_MAX: usize = 256; -const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; -const SEARCH_DB_PIPELINE_LEN: usize = 4; -const FINDER_JOB_QUEUE_LEN_MAX: usize = 10; -const FINDER_IN_FLIGHT_MAX: usize = 800; -const FINDER_BATCH_SIZE: usize = 8; -const CHECK_CHANS_PER_TICK: usize = 10000; const CA_CONN_INSERT_QUEUE_MAX: usize = 256; -const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1; -const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000); -const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); -const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); -const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000); -const FINDER_TIMEOUT: Duration = Duration::from_millis(100); const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); -const DO_ASSIGN_TO_CA_CONN: bool = true; - -static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_REQ_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_REQ_BATCH_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_0_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_1_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0); -static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0); - -#[derive(Clone, Debug, Serialize)] -pub enum ConnectionStateValue { - Unconnected, - Connected { - //#[serde(with = "serde_Instant")] - since: SystemTime, - }, -} - -#[derive(Clone, Debug, Serialize)] -pub struct ConnectionState { - //#[serde(with = "serde_Instant")] - updated: SystemTime, - value: ConnectionStateValue, -} - -#[derive(Clone, Debug, Serialize)] -pub enum WithAddressState { - Unassigned { - //#[serde(with = "serde_Instant")] - assign_at: SystemTime, - }, - Assigned(ConnectionState), -} - -#[derive(Clone, Debug, Serialize)] -pub enum WithStatusSeriesIdStateInner { - UnknownAddress { - since: SystemTime, - }, - SearchPending { - //#[serde(with = "serde_Instant")] - since: SystemTime, - did_send: bool, - }, - WithAddress { - addr: SocketAddrV4, - state: WithAddressState, - }, - NoAddress { - since: SystemTime, - }, -} - -#[derive(Clone, Debug, Serialize)] -pub struct WithStatusSeriesIdState { - inner: WithStatusSeriesIdStateInner, -} - -#[derive(Clone, Debug)] -pub enum ActiveChannelState { - Init { - since: SystemTime, - }, - WaitForStatusSeriesId { - since: SystemTime, - rx: Receiver, dbpg::seriesbychannel::Error>>, - }, - WithStatusSeriesId { - status_series_id: ChannelStatusSeriesId, - state: WithStatusSeriesIdState, - }, -} - -#[derive(Debug)] -pub enum ChannelStateValue { - Active(ActiveChannelState), - ToRemove { addr: Option }, -} - -#[derive(Debug)] -pub struct ChannelState { - value: ChannelStateValue, -} - -#[derive(Debug)] -pub enum CaConnStateValue { - Fresh, - HadFeedback, - Shutdown { since: Instant }, -} - -#[derive(Debug)] -pub struct CaConnState { - last_feedback: Instant, - value: CaConnStateValue, -} - pub struct DaemonOpts { backend: String, local_epics_hostname: String, @@ -193,13 +79,8 @@ impl DaemonOpts { pub struct Daemon { opts: DaemonOpts, - connection_states: BTreeMap, - channel_states: BTreeMap, tx: Sender, rx: Receiver, - chan_check_next: Option, - search_tx: Sender, - ioc_finder_jh: JoinHandle>, insert_queue_counter: Arc, count_unknown_address: usize, count_search_pending: usize, @@ -226,8 +107,6 @@ impl Daemon { .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let datastore = Arc::new(datastore); let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32); - let (search_tx, ioc_finder_jh) = - finder::start_finder(daemon_ev_tx.clone(), opts.backend().into(), opts.pgconf.clone()); // TODO keep join handles and await later let (channel_info_query_tx, ..) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf) @@ -242,8 +121,11 @@ impl Daemon { let common_insert_item_queue_2 = rx; let conn_set_ctrl = CaConnSet::start( + opts.backend.clone(), + opts.local_epics_hostname.clone(), common_insert_item_queue.sender().unwrap().inner().clone(), channel_info_query_tx.clone(), + opts.pgconf.clone(), ); let ingest_commons = IngestCommons { @@ -313,13 +195,8 @@ impl Daemon { let ret = Self { opts, - connection_states: BTreeMap::new(), - channel_states: BTreeMap::new(), tx: daemon_ev_tx, rx: daemon_ev_rx, - chan_check_next: None, - search_tx, - ioc_finder_jh, insert_queue_counter, count_unknown_address: 0, count_search_pending: 0, @@ -349,324 +226,6 @@ impl Daemon { !self.shutting_down } - fn check_connection_states(&mut self) -> Result<(), Error> { - let tsnow = Instant::now(); - for (k, v) in &mut self.connection_states { - match v.value { - CaConnStateValue::Fresh => { - // TODO check for delta t since last issued status command. - if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { - error!("TODO Fresh timeout send connection-close for {k:?}"); - self.stats.ca_conn_status_feedback_timeout_inc(); - v.value = CaConnStateValue::Shutdown { since: tsnow }; - } - } - CaConnStateValue::HadFeedback => { - // TODO check for delta t since last issued status command. - if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { - error!("TODO HadFeedback timeout send connection-close for {k:?}"); - self.stats.ca_conn_status_feedback_timeout_inc(); - v.value = CaConnStateValue::Shutdown { since: tsnow }; - } - } - CaConnStateValue::Shutdown { since } => { - if tsnow.saturating_duration_since(since) > Duration::from_millis(10000) { - self.stats.critical_error_inc(); - error!("Shutdown of CaConn to {} failed", k); - } - } - } - } - Ok(()) - } - - fn update_channel_state_counts(&mut self) -> (u64,) { - let mut unknown_address_count = 0; - let mut with_address_count = 0; - let mut search_pending_count = 0; - let mut no_address_count = 0; - for (_ch, st) in &self.channel_states { - match &st.value { - ChannelStateValue::Active(st2) => match st2 { - ActiveChannelState::Init { .. } => { - unknown_address_count += 1; - } - ActiveChannelState::WaitForStatusSeriesId { .. } => { - unknown_address_count += 1; - } - ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { - WithStatusSeriesIdStateInner::UnknownAddress { .. } => { - unknown_address_count += 1; - } - WithStatusSeriesIdStateInner::SearchPending { .. } => { - search_pending_count += 1; - } - WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { - WithAddressState::Unassigned { .. } => { - with_address_count += 1; - } - WithAddressState::Assigned(_) => { - with_address_count += 1; - } - }, - WithStatusSeriesIdStateInner::NoAddress { .. } => { - no_address_count += 1; - } - }, - }, - ChannelStateValue::ToRemove { .. } => { - unknown_address_count += 1; - } - } - } - self.stats - .channel_unknown_address - .store(unknown_address_count, atomic::Ordering::Release); - self.stats - .channel_with_address - .store(with_address_count, atomic::Ordering::Release); - self.stats - .channel_search_pending - .store(search_pending_count, atomic::Ordering::Release); - self.stats - .channel_no_address - .store(no_address_count, atomic::Ordering::Release); - (search_pending_count,) - } - - fn channel_state_counts_2(&mut self) { - self.count_unknown_address = 0; - self.count_search_pending = 0; - self.count_search_sent = 0; - self.count_no_address = 0; - self.count_unassigned = 0; - self.count_assigned = 0; - for (_ch, st) in &self.channel_states { - match &st.value { - ChannelStateValue::Active(st) => match st { - ActiveChannelState::Init { .. } => {} - ActiveChannelState::WaitForStatusSeriesId { .. } => {} - ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { - WithStatusSeriesIdStateInner::UnknownAddress { .. } => { - self.count_unknown_address += 1; - } - WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => { - self.count_search_pending += 1; - if *did_send { - self.count_search_sent += 1; - } - } - WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { - WithAddressState::Unassigned { .. } => { - self.count_unassigned += 1; - } - WithAddressState::Assigned(_) => { - self.count_assigned += 1; - } - }, - WithStatusSeriesIdStateInner::NoAddress { .. } => { - self.count_no_address += 1; - } - }, - }, - ChannelStateValue::ToRemove { .. } => {} - } - } - } - - async fn check_channel_states(&mut self) -> Result<(), Error> { - let (mut search_pending_count,) = self.update_channel_state_counts(); - let k = self.chan_check_next.take(); - let it = if let Some(last) = k { - trace!("check_chans start at {:?}", last); - self.channel_states.range_mut(last..) - } else { - self.channel_states.range_mut(..) - }; - let tsnow = SystemTime::now(); - let mut attempt_series_search = true; - for (i, (ch, st)) in it.enumerate() { - match &mut st.value { - ChannelStateValue::Active(st2) => match st2 { - ActiveChannelState::Init { since: _ } => { - let (tx, rx) = async_channel::bounded(1); - let q = ChannelInfoQuery { - backend: self.ingest_commons.backend.clone(), - channel: ch.id().into(), - scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, - shape_dims: Vec::new(), - tx, - }; - if attempt_series_search { - match self.channel_info_query_tx.try_send(q) { - Ok(()) => { - *st2 = ActiveChannelState::WaitForStatusSeriesId { since: tsnow, rx }; - } - Err(e) => match e { - _ => { - attempt_series_search = false; - } - }, - } - } - } - ActiveChannelState::WaitForStatusSeriesId { since, rx } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > Duration::from_millis(5000) { - warn!("timeout can not get status series id for {ch:?}"); - *st2 = ActiveChannelState::Init { since: tsnow }; - } else { - match rx.try_recv() { - Ok(x) => match x { - Ok(x) => { - //info!("received status series id: {x:?}"); - *st2 = ActiveChannelState::WithStatusSeriesId { - status_series_id: ChannelStatusSeriesId::new(x.into_inner().id()), - state: WithStatusSeriesIdState { - inner: WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow }, - }, - }; - } - Err(e) => { - error!("could not get a status series id {ch:?} {e}"); - } - }, - Err(_) => { - // TODO should maybe not attempt receive on each channel check. - } - } - } - } - ActiveChannelState::WithStatusSeriesId { - status_series_id, - state, - } => match &mut state.inner { - WithStatusSeriesIdStateInner::UnknownAddress { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > UNKNOWN_ADDRESS_STAY { - //info!("UnknownAddress {} {:?}", i, ch); - if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX { - search_pending_count += 1; - state.inner = WithStatusSeriesIdStateInner::SearchPending { - since: tsnow, - did_send: false, - }; - SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel); - } - } - } - WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => { - //info!("SearchPending {} {:?}", i, ch); - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > SEARCH_PENDING_TIMEOUT { - info!("Search timeout for {ch:?}"); - state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow }; - search_pending_count -= 1; - } - } - WithStatusSeriesIdStateInner::WithAddress { addr, state } => { - //info!("WithAddress {} {:?}", i, ch); - use WithAddressState::*; - match state { - Unassigned { assign_at } => { - if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow { - let backend = self.opts.backend().into(); - let addr_v4 = SocketAddr::V4(*addr); - let name = ch.id().into(); - let cssid = status_series_id.clone(); - let local_epics_hostname = self.opts.local_epics_hostname.clone(); - // This operation is meant to complete very quickly - self.connset_ctrl - .add_channel(backend, addr_v4, name, cssid, local_epics_hostname) - .slow_warn(500) - .instrument(info_span!("add_channel_to_addr")) - .await?; - let cs = ConnectionState { - updated: tsnow, - value: ConnectionStateValue::Unconnected, - }; - *state = WithAddressState::Assigned(cs); - self.connection_states - .entry(*addr) - .and_modify(|_| { - // TODO may be count for metrics. - // Nothing else to do. - }) - .or_insert_with(|| { - let t = CaConnState { - last_feedback: Instant::now(), - value: CaConnStateValue::Fresh, - }; - t - }); - // TODO move await out of here - if let Some(tx) = self.ingest_commons.insert_item_queue.sender() { - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: tsnow, - series: SeriesId::new(status_series_id.id()), - status: ChannelStatus::AssignedToAddress, - }); - match tx.send(item).await { - Ok(_) => {} - Err(_) => { - // TODO feed into throttled log, or count as unlogged - } - } - } - } - } - Assigned(_) => { - // TODO check if channel is healthy and alive - } - } - } - WithStatusSeriesIdStateInner::NoAddress { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > NO_ADDRESS_STAY { - state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow }; - } - } - }, - }, - ChannelStateValue::ToRemove { .. } => { - // TODO if assigned to some address, - } - } - if i >= CHECK_CHANS_PER_TICK { - self.chan_check_next = Some(ch.clone()); - break; - } - } - for (ch, st) in &mut self.channel_states { - if let ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { - status_series_id: _, - state, - }) = &mut st.value - { - if let WithStatusSeriesIdStateInner::SearchPending { since: _, did_send } = &mut state.inner { - if *did_send == false { - match self.search_tx.try_send(ch.id().into()) { - Ok(()) => { - *did_send = true; - SEARCH_REQ_SEND_COUNT.fetch_add(1, atomic::Ordering::AcqRel); - } - Err(e) => match e { - async_channel::TrySendError::Full(_) => {} - async_channel::TrySendError::Closed(_) => { - error!("Finder channel closed"); - // TODO recover from this. - panic!(); - } - }, - } - } - } - } - } - self.channel_state_counts_2(); - Ok(()) - } - async fn check_caconn_chans(&mut self) -> Result<(), Error> { if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL { self.connset_ctrl.check_health().await?; @@ -709,8 +268,10 @@ impl Daemon { warn!("Received SIGTERM"); SIGTERM.store(2, atomic::Ordering::Release); } - self.check_connection_states()?; - self.check_channel_states().await?; + warn!("TODO let CaConnSet check health"); + // TODO + // self.check_connection_states()?; + // self.check_channel_states().await?; let dt = ts1.elapsed(); if dt > Duration::from_millis(500) { info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); @@ -733,38 +294,24 @@ impl Daemon { self.count_assigned, self.insert_queue_counter.load(atomic::Ordering::Acquire), ); - if false { - info!( - "{:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5}", - SEARCH_REQ_MARK_COUNT.load(atomic::Ordering::Acquire), - SEARCH_REQ_SEND_COUNT.load(atomic::Ordering::Acquire), - SEARCH_REQ_RECV_COUNT.load(atomic::Ordering::Acquire), - SEARCH_REQ_BATCH_SEND_COUNT.load(atomic::Ordering::Acquire), - SEARCH_REQ_BATCH_RECV_COUNT.load(atomic::Ordering::Acquire), - SEARCH_RES_0_COUNT.load(atomic::Ordering::Acquire), - SEARCH_RES_1_COUNT.load(atomic::Ordering::Acquire), - SEARCH_RES_2_COUNT.load(atomic::Ordering::Acquire), - SEARCH_RES_3_COUNT.load(atomic::Ordering::Acquire), - SEARCH_ANS_COUNT.load(atomic::Ordering::Acquire), - ); - } } Ok(()) } - fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> { - if !self.channel_states.contains_key(&ch) { - let st = ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::Init { - since: SystemTime::now(), - }), - }; - self.channel_states.insert(ch, st); - } + async fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> { + self.connset_ctrl + .add_channel( + self.opts.backend.clone(), + ch.id().into(), + self.opts.local_epics_hostname.clone(), + ) + .await?; Ok(()) } - fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> { + async fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> { + warn!("TODO handle_channel_remove"); + #[cfg(DISABLED)] if let Some(k) = self.channel_states.get_mut(&ch) { match &k.value { ChannelStateValue::Active(j) => match j { @@ -801,9 +348,11 @@ impl Daemon { } async fn handle_search_done(&mut self, item: Result, Error>) -> Result<(), Error> { + warn!("TODO handle_search_done"); //debug!("handle SearchDone: {res:?}"); - let allow_create_new_connections = self.allow_create_new_connections(); - let tsnow = SystemTime::now(); + // let allow_create_new_connections = self.allow_create_new_connections(); + // let tsnow = SystemTime::now(); + #[cfg(DISABLED)] match item { Ok(ress) => { SEARCH_ANS_COUNT.fetch_add(ress.len(), atomic::Ordering::AcqRel); @@ -916,6 +465,12 @@ impl Daemon { Ok(()) } + async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> { + warn!("TODO handle_ca_conn_done {conn_addr:?}"); + Ok(()) + } + + #[cfg(DISABLED)] async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> { info!("handle_ca_conn_done {conn_addr:?}"); self.connection_states.remove(&conn_addr); @@ -984,6 +539,8 @@ impl Daemon { use netfetch::ca::conn::ConnCommandResultKind::*; match &item.kind { CheckHealth => { + todo!("TODO collect the CaConn health check in CaConnSet"); + #[cfg(DISABLED)] if let Some(st) = self.connection_states.get_mut(&addr) { self.stats.ca_conn_status_feedback_recv_inc(); st.last_feedback = Instant::now(); @@ -1006,6 +563,11 @@ impl Daemon { } } + async fn handle_shutdown(&mut self) -> Result<(), Error> { + todo!("handle_shutdown"); + } + + #[cfg(DISABLED)] async fn handle_shutdown(&mut self) -> Result<(), Error> { warn!("received shutdown event"); if self.shutting_down { @@ -1041,8 +603,8 @@ impl Daemon { let _ = ts1.elapsed(); ret } - ChannelAdd(ch) => self.handle_channel_add(ch), - ChannelRemove(ch) => self.handle_channel_remove(ch), + ChannelAdd(ch) => self.handle_channel_add(ch).await, + ChannelRemove(ch) => self.handle_channel_remove(ch).await, SearchDone(item) => self.handle_search_done(item).await, CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await, Shutdown => self.handle_shutdown().await, @@ -1108,8 +670,6 @@ impl Daemon { } } } - warn!("TODO wait for IOC finder properly"); - let _ = &self.ioc_finder_jh; warn!("TODO wait for insert workers"); let _ = &self.insert_workers_jh; info!("daemon done"); diff --git a/daqingest/src/daemon/finder.rs b/daqingest/src/daemon/finder.rs index 7fec502..8b13789 100644 --- a/daqingest/src/daemon/finder.rs +++ b/daqingest/src/daemon/finder.rs @@ -1,340 +1 @@ -use super::FINDER_BATCH_SIZE; -use super::FINDER_IN_FLIGHT_MAX; -use super::FINDER_TIMEOUT; -use crate::daemon::CURRENT_SEARCH_PENDING_MAX; -use crate::daemon::FINDER_JOB_QUEUE_LEN_MAX; -use crate::daemon::SEARCH_BATCH_MAX; -use crate::daemon::SEARCH_DB_PIPELINE_LEN; -use crate::daemon::SEARCH_REQ_BATCH_RECV_COUNT; -use crate::daemon::SEARCH_RES_0_COUNT; -use crate::daemon::SEARCH_RES_1_COUNT; -use crate::daemon::SEARCH_RES_2_COUNT; -use crate::daemon::SEARCH_RES_3_COUNT; -use async_channel::Receiver; -use async_channel::Sender; -use dbpg::conn::make_pg_client; -use dbpg::postgres::Row as PgRow; -use err::Error; -use futures_util::FutureExt; -use futures_util::StreamExt; -use log::*; -use netfetch::ca::findioc::FindIocRes; -use netfetch::ca::findioc::FindIocStream; -use netfetch::daemon_common::DaemonEvent; -use netpod::Database; -use std::collections::HashMap; -use std::collections::VecDeque; -use std::net::SocketAddrV4; -use std::sync::atomic; -use std::time::Duration; -use std::time::Instant; -use taskrun::tokio; -use tokio::task::JoinHandle; -#[allow(unused)] -macro_rules! debug_batch { - // (D$($arg:tt)*) => (); - ($($arg:tt)*) => (if false { - debug!($($arg)*); - }); -} - -#[allow(unused)] -macro_rules! trace_batch { - // (D$($arg:tt)*) => (); - ($($arg:tt)*) => (if false { - trace!($($arg)*); - }); -} - -fn transform_pgres(rows: Vec) -> VecDeque { - let mut ret = VecDeque::new(); - for row in rows { - let ch: Result = row.try_get(0); - if let Ok(ch) = ch { - if let Some(addr) = row.get::<_, Option>(1) { - let addr = addr.parse().map_or(None, |x| Some(x)); - let item = FindIocRes { - channel: ch, - response_addr: None, - addr, - dt: Duration::from_millis(0), - }; - ret.push_back(item); - } else { - let item = FindIocRes { - channel: ch, - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - ret.push_back(item); - } - } else if let Err(e) = ch { - error!("bad string from pg: {e:?}"); - } - } - ret -} - -async fn finder_worker_single( - inp: Receiver>, - tx: Sender, - backend: String, - db: Database, -) -> Result<(), Error> { - let pg = make_pg_client(&db) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let sql = concat!( - "with q1 as (select * from unnest($2::text[]) as unn (ch))", - " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", - " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", - " order by tt.facility, tt.channel, tsmod desc", - ); - let qu_select_multi = pg - .prepare(sql) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let mut resdiff = 0; - loop { - match inp.recv().await { - Ok(batch) => { - SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); - let ts1 = Instant::now(); - debug_batch!("run query batch len {}", batch.len()); - let qres = pg.query(&qu_select_multi, &[&backend, &batch]).await; - let dt = ts1.elapsed(); - debug_batch!( - "done query batch len {}: {} {:.3}ms", - batch.len(), - qres.is_ok(), - dt.as_secs_f32() * 1e3 - ); - if dt > Duration::from_millis(5000) { - let mut out = String::from("["); - for s in &batch { - if out.len() > 1 { - out.push_str(", "); - } - out.push('\''); - out.push_str(s); - out.push('\''); - } - out.push(']'); - eprintln!("VERY SLOW QUERY\n{out}"); - } - match qres { - Ok(rows) => { - if rows.len() > batch.len() { - error!("MORE RESULTS THAN INPUT"); - } else if rows.len() < batch.len() { - resdiff += batch.len() - rows.len(); - } - let nbatch = batch.len(); - trace_batch!("received results {} resdiff {}", rows.len(), resdiff); - SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); - let items = transform_pgres(rows); - let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); - let mut to_add = Vec::new(); - for s in batch { - if !names.contains_key(&s) { - let item = FindIocRes { - channel: s, - response_addr: None, - addr: None, - dt: Duration::from_millis(0), - }; - to_add.push(item); - } - } - SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); - let mut items = items; - items.extend(to_add.into_iter()); - if items.len() != nbatch { - error!("STILL NOT MATCHING LEN"); - } - SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await; - match x { - Ok(_) => {} - Err(e) => { - error!("finder sees: {e}"); - break; - } - } - } - Err(e) => { - error!("finder sees error: {e}"); - tokio::time::sleep(Duration::from_millis(1000)).await; - } - } - } - Err(_e) => break, - } - } - Ok(()) -} - -async fn finder_worker( - qrx: Receiver, - tx: Sender, - backend: String, - db: Database, -) -> Result<(), Error> { - // TODO do something with join handle - let (batch_rx, _jh) = batchtools::batcher::batch( - SEARCH_BATCH_MAX, - Duration::from_millis(200), - SEARCH_DB_PIPELINE_LEN, - qrx, - ); - for _ in 0..SEARCH_DB_PIPELINE_LEN { - // TODO use join handle - tokio::spawn(finder_worker_single( - batch_rx.clone(), - tx.clone(), - backend.clone(), - db.clone(), - )); - } - Ok(()) -} - -pub fn start_finder( - tx: Sender, - backend: String, - db: Database, -) -> (Sender, tokio::task::JoinHandle>) { - let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); - let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db)); - (qtx, jh) -} - -struct OptFut { - fut: Option, -} - -impl OptFut { - fn empty() -> Self { - Self { fut: None } - } - - fn new(fut: F) -> Self { - Self { fut: Some(fut) } - } - - fn is_enabled(&self) -> bool { - self.fut.is_some() - } -} - -impl futures_util::Future for OptFut -where - F: futures_util::Future + std::marker::Unpin, -{ - type Output = ::Output; - - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { - match self.fut.as_mut() { - Some(fut) => fut.poll_unpin(cx), - None => std::task::Poll::Pending, - } - } -} - -#[allow(unused)] -fn start_finder_ca(tx: Sender, tgts: Vec) -> (Sender, JoinHandle<()>) { - let (qtx, qrx) = async_channel::bounded(32); - let (atx, arx) = async_channel::bounded(32); - let ioc_finder_fut = async move { - let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE); - let fut_tick_dur = Duration::from_millis(100); - let mut finder_more = true; - let mut finder_fut = OptFut::new(finder.next()); - let mut qrx_fut = OptFut::new(qrx.recv()); - let mut qrx_more = true; - let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - let mut asend = OptFut::empty(); - loop { - tokio::select! { - _ = &mut asend, if asend.is_enabled() => { - asend = OptFut::empty(); - } - r1 = &mut finder_fut, if finder_fut.is_enabled() => { - finder_fut = OptFut::empty(); - match r1 { - Some(item) => { - asend = OptFut::new(atx.send(item)); - } - None => { - // TODO finder has stopped, do no longer poll on it - warn!("Finder has stopped"); - finder_more = false; - } - } - if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - qrx_fut = OptFut::new(qrx.recv()); - } - if finder_more { - finder_fut = OptFut::new(finder.next()); - } - fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - } - r2 = &mut qrx_fut, if qrx_fut.is_enabled() => { - qrx_fut = OptFut::empty(); - match r2 { - Ok(item) => { - finder.push(item); - } - Err(e) => { - // TODO input is done... ignore from here on. - error!("Finder input channel error {e}"); - qrx_more = false; - } - } - if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - qrx_fut = OptFut::new(qrx.recv()); - } - if finder_more { - finder_fut = OptFut::new(finder.next()); - } else { - finder_fut = OptFut::empty(); - } - fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - } - _ = &mut fut_tick => { - if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { - qrx_fut = OptFut::new(qrx.recv()); - } - if finder_more { - finder_fut = OptFut::new(finder.next()); - } else { - finder_fut = OptFut::empty(); - } - fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); - } - else => { - error!("all branches are disabled"); - break; - } - }; - } - }; - let ioc_finder_jh = taskrun::spawn(ioc_finder_fut); - taskrun::spawn({ - async move { - while let Ok(item) = arx.recv().await { - match tx.send(DaemonEvent::SearchDone(item)).await { - Ok(_) => {} - Err(e) => { - error!("search res fwd {e}"); - } - } - } - warn!("search res fwd nput broken"); - } - }); - (qtx, ioc_finder_jh) -} diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index c9072aa..1ee5a78 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -1,13 +1,16 @@ use async_channel::Receiver; +use async_channel::SendError; use async_channel::Sender; use err::thiserror; use err::ThisError; +use futures_util::Future; use futures_util::StreamExt; use log::*; use md5::Digest; use netpod::Database; use series::series::Existence; use series::SeriesId; +use std::pin::Pin; use std::time::Duration; use std::time::Instant; use taskrun::tokio; @@ -50,29 +53,23 @@ impl From for Error { } } +pub type BoxedSend = Pin> + Send>>; + +pub trait CanSendChannelInfoResult: Sync { + fn make_send(&self, item: Result, Error>) -> BoxedSend; +} + pub struct ChannelInfoQuery { pub backend: String, pub channel: String, pub scalar_type: i32, pub shape_dims: Vec, - pub tx: Sender, Error>>, -} - -impl ChannelInfoQuery { - pub fn dummy(&self) -> Self { - Self { - backend: String::new(), - channel: String::new(), - scalar_type: -1, - shape_dims: Vec::new(), - tx: self.tx.clone(), - } - } + pub tx: Pin>, } struct ChannelInfoResult { series: Existence, - tx: Sender, Error>>, + tx: Pin>, // only for trace: channel: String, } @@ -285,7 +282,8 @@ impl Worker { let res4 = res3?; for r in res4 { trace3!("try to send result for {} {:?}", r.channel, r.series); - match r.tx.send(Ok(r.series)).await { + let fut = r.tx.make_send(Ok(r.series)); + match fut.await { Ok(()) => {} Err(_e) => { warn!("can not deliver result"); diff --git a/ingest-bsread/src/bsreadclient.rs b/ingest-bsread/src/bsreadclient.rs index 88347d2..d4c855a 100644 --- a/ingest-bsread/src/bsreadclient.rs +++ b/ingest-bsread/src/bsreadclient.rs @@ -125,6 +125,10 @@ impl BsreadClient { let evtset: Vec<_> = fr.data.iter().map(|&x| x != 0).collect(); let scalar_type = ScalarType::BOOL; let shape = Shape::Wave(256); + if true { + todo!("TODO bsreadclient try to fetch series id"); + } + #[cfg(DISABLED)] if self.tmp_evtset_series.is_none() { debug!("try to fetch series id"); let (tx, rx) = async_channel::bounded(8); diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 56484a2..e6b04ca 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -43,3 +43,4 @@ items_2 = { path = "../../daqbuffer/crates/items_2" } streams = { path = "../../daqbuffer/crates/streams" } taskrun = { path = "../../daqbuffer/crates/taskrun" } bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } +batchtools = { path = "../batchtools" } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 7e17b76..810b5c2 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,8 +1,10 @@ pub mod conn; pub mod connset; +pub mod finder; pub mod findioc; pub mod proto; pub mod search; +pub mod statemap; use self::connset::CaConnSetCtrl; use crate::ca::connset::CaConnSet; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 877e901..5cee7e2 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -9,6 +9,7 @@ use crate::ca::proto::EventAdd; use crate::senderpolling::SenderPolling; use crate::timebin::ConnTimeBin; use async_channel::Sender; +use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use futures_util::stream::FuturesUnordered; @@ -306,6 +307,7 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 { #[derive(Debug)] pub enum ConnCommandKind { + SeriesLookupResult(Result, dbpg::seriesbychannel::Error>), ChannelAdd(String, ChannelStatusSeriesId), ChannelRemove(String), CheckHealth, @@ -319,6 +321,13 @@ pub struct ConnCommand { } impl ConnCommand { + pub fn series_lookup(qu: Result, dbpg::seriesbychannel::Error>) -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::SeriesLookupResult(qu), + } + } + pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self { Self { id: Self::make_id(), @@ -389,6 +398,21 @@ enum ChannelSetOp { Remove, } +struct SendSeriesLookup { + tx: Sender, +} + +impl CanSendChannelInfoResult for SendSeriesLookup { + fn make_send( + &self, + item: Result, dbpg::seriesbychannel::Error>, + ) -> dbpg::seriesbychannel::BoxedSend { + let tx = self.tx.clone(); + let fut = async move { tx.send(ConnCommand::series_lookup(item)).await.map_err(|_| ()) }; + Box::pin(fut) + } +} + struct ChannelOpsResources<'a> { channel_set_ops: &'a StdMutex>, channels: &'a mut BTreeMap, @@ -639,6 +663,7 @@ impl CaConn { self.cmd_shutdown(); Ready(Some(Ok(()))) } + ConnCommandKind::SeriesLookupResult(_) => todo!("TODO handle SeriesLookupResult"), } } Ready(None) => { @@ -962,24 +987,28 @@ impl CaConn { let _ = cx; loop { break if let Some(mut entry) = self.series_lookup_schedule.first_entry() { - let dummy = entry.get().dummy(); - let query = std::mem::replace(entry.get_mut(), dummy); - match self.channel_info_query_tx.try_send(query) { - Ok(()) => { - entry.remove(); - continue; - } - Err(e) => match e { - async_channel::TrySendError::Full(_) => { - warn!("series lookup channel full"); - *entry.get_mut() = e.into_inner(); - } - async_channel::TrySendError::Closed(_) => { - warn!("series lookup channel closed"); - // *entry.get_mut() = e.into_inner(); + todo!("emit_series_lookup"); + #[cfg(DISABLED)] + { + let dummy = entry.get().dummy(); + let query = std::mem::replace(entry.get_mut(), dummy); + match self.channel_info_query_tx.try_send(query) { + Ok(()) => { entry.remove(); + continue; } - }, + Err(e) => match e { + async_channel::TrySendError::Full(_) => { + warn!("series lookup channel full"); + *entry.get_mut() = e.into_inner(); + } + async_channel::TrySendError::Closed(_) => { + warn!("series lookup channel closed"); + // *entry.get_mut() = e.into_inner(); + entry.remove(); + } + }, + } } } else { () @@ -1439,29 +1468,32 @@ impl CaConn { *ch_s = ChannelState::FetchingSeriesId(created_state); // TODO handle error in different way. Should most likely not abort. if !self.series_lookup_schedule.contains_key(&cid) { - let (tx, rx) = async_channel::bounded(1); + let tx = SendSeriesLookup { + tx: self.conn_command_tx.clone(), + }; let query = ChannelInfoQuery { backend: self.backend.clone(), channel: name.clone(), scalar_type: scalar_type.to_scylla_i32(), shape_dims: shape.to_scylla_vec(), - tx, + tx: Box::pin(tx), }; self.series_lookup_schedule.insert(cid, query); - let fut = async move { - match rx.recv().await { - Ok(item) => match item { - Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)), - Err(e) => Err(Error::with_msg_no_trace(e.to_string())), - }, - Err(e) => { - // TODO count only - error!("can not receive series lookup result for {name} {e}"); - Err(Error::with_msg_no_trace("can not receive lookup result")) - } - } - }; - self.series_lookup_futs.push(Box::pin(fut)); + todo!("TODO discover the series lookup from main command queue"); + // let fut = async move { + // match rx.recv().await { + // Ok(item) => match item { + // Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)), + // Err(e) => Err(Error::with_msg_no_trace(e.to_string())), + // }, + // Err(e) => { + // // TODO count only + // error!("can not receive series lookup result for {name} {e}"); + // Err(Error::with_msg_no_trace("can not receive lookup result")) + // } + // } + // }; + // self.series_lookup_futs.push(Box::pin(fut)); } else { // TODO count only warn!("series lookup for {name} already in progress"); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 0c92a71..f8e0ee8 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,34 +1,77 @@ -use super::conn::CaConnEvent; -use super::conn::ConnCommand; +use super::findioc::FindIocRes; +use super::statemap; +use super::statemap::ChannelState; use crate::ca::conn::CaConn; +use crate::ca::conn::CaConnEvent; use crate::ca::conn::CaConnEventValue; use crate::ca::conn::CaConnOpts; +use crate::ca::conn::ConnCommand; +use crate::ca::statemap::CaConnState; +use crate::ca::statemap::ConnectionState; +use crate::ca::statemap::ConnectionStateValue; +use crate::ca::statemap::WithAddressState; +use crate::daemon_common::Channel; use crate::errconv::ErrConv; use crate::rt::JoinHandle; use crate::rt::TokMx; use async_channel::Receiver; use async_channel::Sender; +use atomic::AtomicUsize; +use dbpg::seriesbychannel::BoxedSend; +use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; -use netpod::log::*; +use log::*; +use netpod::Database; +use netpod::Shape; +use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; +use series::series::Existence; use series::ChannelStatusSeriesId; +use series::SeriesId; +use statemap::ActiveChannelState; +use statemap::CaConnStateValue; +use statemap::ChannelStateMap; +use statemap::ChannelStateValue; +use statemap::WithStatusSeriesIdState; +use statemap::WithStatusSeriesIdStateInner; +use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; +use stats::CaConnSetStats; use stats::CaConnStats; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddr; use std::net::SocketAddrV4; +use std::sync::atomic; use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use std::time::SystemTime; use taskrun::tokio; +const DO_ASSIGN_TO_CA_CONN: bool = true; +const CHECK_CHANS_PER_TICK: usize = 10000; +pub const SEARCH_BATCH_MAX: usize = 256; +pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; +const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000); +const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); +const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); +const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000); + +// TODO put all these into metrics +static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_REQ_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0); + #[derive(Debug, PartialEq, Eq)] pub struct CmdId(SocketAddrV4, usize); pub struct CaConnRes { + state: CaConnState, sender: Sender, stats: Arc, // TODO await on jh @@ -41,18 +84,37 @@ impl CaConnRes { } } +#[derive(Debug, Clone)] +pub struct ChannelAddWithAddr { + backend: String, + name: String, + local_epics_hostname: String, + cssid: ChannelStatusSeriesId, + addr: SocketAddr, +} + +#[derive(Debug, Clone)] +pub struct ChannelAddWithStatusId { + backend: String, + name: String, + local_epics_hostname: String, + cssid: ChannelStatusSeriesId, +} + #[derive(Debug, Clone)] pub struct ChannelAdd { backend: String, name: String, - addr: SocketAddr, - cssid: ChannelStatusSeriesId, local_epics_hostname: String, } #[derive(Debug)] pub enum ConnSetCmd { + SeriesLookupResult(Result, dbpg::seriesbychannel::Error>), ChannelAdd(ChannelAdd), + ChannelAddWithStatusId(ChannelAddWithStatusId), + ChannelAddWithAddr(ChannelAddWithAddr), + IocAddrQueryResult(VecDeque), CheckHealth, Shutdown, } @@ -69,19 +131,10 @@ pub struct CaConnSetCtrl { } impl CaConnSetCtrl { - pub async fn add_channel( - &self, - backend: String, - addr: SocketAddr, - name: String, - cssid: ChannelStatusSeriesId, - local_epics_hostname: String, - ) -> Result<(), Error> { + pub async fn add_channel(&self, backend: String, name: String, local_epics_hostname: String) -> Result<(), Error> { let cmd = ChannelAdd { backend, name, - addr, - cssid, local_epics_hostname, }; let cmd = ConnSetCmd::ChannelAdd(cmd); @@ -102,28 +155,65 @@ impl CaConnSetCtrl { } } +#[derive(Debug)] +pub struct IocAddrQuery { + pub name: String, +} + +struct SeriesLookupSender { + tx: Sender, +} + +impl CanSendChannelInfoResult for SeriesLookupSender { + fn make_send(&self, item: Result, dbpg::seriesbychannel::Error>) -> BoxedSend { + let tx = self.tx.clone(); + let fut = async move { + tx.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::SeriesLookupResult(item))) + .await + .map_err(|_| ()) + }; + Box::pin(fut) + } +} + pub struct CaConnSet { + backend: String, + local_epics_hostname: String, + search_tx: Sender, ca_conn_ress: BTreeMap, + channel_states: ChannelStateMap, connset_tx: Sender, connset_rx: Receiver, channel_info_query_tx: Sender, storage_insert_tx: Sender, shutdown: bool, + chan_check_next: Option, + stats: CaConnSetStats, } impl CaConnSet { pub fn start( + backend: String, + local_epics_hostname: String, storage_insert_tx: Sender, channel_info_query_tx: Sender, + pgconf: Database, ) -> CaConnSetCtrl { let (connset_tx, connset_rx) = async_channel::bounded(10000); + let (search_tx, ioc_finder_jh) = super::finder::start_finder(connset_tx.clone(), backend.clone(), pgconf); let connset = Self { + backend, + local_epics_hostname, + search_tx, ca_conn_ress: BTreeMap::new(), + channel_states: ChannelStateMap::new(), connset_tx: connset_tx.clone(), connset_rx, channel_info_query_tx, storage_insert_tx, shutdown: false, + chan_check_next: None, + stats: CaConnSetStats::new(), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -150,7 +240,40 @@ impl CaConnSet { async fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> { match ev { CaConnSetEvent::ConnSetCmd(cmd) => match cmd { - ConnSetCmd::ChannelAdd(x) => self.add_channel_to_addr(x).await, + ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await, + ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await, + ConnSetCmd::IocAddrQueryResult(res) => { + for e in res { + if let Some(addr) = e.addr { + let ch = Channel::new(e.channel.clone()); + if let Some(chst) = self.channel_states.inner().get(&ch) { + if let ChannelStateValue::Active(ast) = &chst.value { + if let ActiveChannelState::WithStatusSeriesId { + status_series_id, + state, + } = ast + { + let add = ChannelAddWithAddr { + backend: self.backend.clone(), + name: e.channel, + addr: SocketAddr::V4(addr), + cssid: status_series_id.clone(), + local_epics_hostname: self.local_epics_hostname.clone(), + }; + } else { + warn!("TODO got address but no longer active"); + } + } else { + warn!("TODO got address but no longer active"); + } + } else { + warn!("ioc addr lookup done but channel no longer here"); + } + } + } + Ok(()) + } + ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await, ConnSetCmd::CheckHealth => { error!("TODO implement check health"); Ok(()) @@ -160,6 +283,7 @@ impl CaConnSet { self.shutdown = true; Ok(()) } + ConnSetCmd::SeriesLookupResult(_) => todo!(), }, CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value { CaConnEventValue::None => Ok(()), @@ -174,7 +298,53 @@ impl CaConnSet { } } - async fn add_channel_to_addr(&mut self, add: ChannelAdd) -> Result<(), Error> { + async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> { + // TODO should I add the transition through ActiveChannelState::Init as well? + let ch = Channel::new(add.name.clone()); + let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState { + value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { + since: SystemTime::now(), + }), + }); + let item = ChannelInfoQuery { + backend: add.backend, + channel: add.name, + scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, + shape_dims: Vec::new(), + tx: Box::pin(SeriesLookupSender { + tx: self.connset_tx.clone(), + }), + }; + self.channel_info_query_tx.send(item).await?; + Ok(()) + } + + async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> { + let ch = Channel::new(add.name.clone()); + if let Some(chst) = self.channel_states.inner().get_mut(&ch) { + if let ChannelStateValue::Active(chst2) = &mut chst.value { + if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 { + *chst2 = ActiveChannelState::WithStatusSeriesId { + status_series_id: add.cssid, + state: WithStatusSeriesIdState { + inner: WithStatusSeriesIdStateInner::NoAddress { + since: SystemTime::now(), + }, + }, + }; + } else { + warn!("TODO have a status series id but no more channel"); + } + } else { + warn!("TODO have a status series id but no more channel"); + } + } else { + warn!("TODO have a status series id but no more channel"); + } + Ok(()) + } + + async fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> { if !self.ca_conn_ress.contains_key(&add.addr) { let c = self.create_ca_conn(add.clone())?; self.ca_conn_ress.insert(add.addr, c); @@ -185,7 +355,7 @@ impl CaConnSet { Ok(()) } - fn create_ca_conn(&self, add: ChannelAdd) -> Result { + fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result { // TODO should we save this as event? let opts = CaConnOpts::default(); let addr = add.addr; @@ -207,6 +377,7 @@ impl CaConnSet { let conn_item_tx = self.connset_tx.clone(); let jh = tokio::spawn(Self::ca_conn_item_merge(conn, conn_item_tx, addr_v4)); let ca_conn_res = CaConnRes { + state: CaConnState::new(CaConnStateValue::Fresh), sender: conn_tx, stats: conn_stats, jh, @@ -354,4 +525,221 @@ impl CaConnSet { Ok(false) } } + + fn check_connection_states(&mut self) -> Result<(), Error> { + let tsnow = Instant::now(); + for (addr, val) in &mut self.ca_conn_ress { + let state = &mut val.state; + let v = &mut state.value; + match v { + CaConnStateValue::Fresh => { + // TODO check for delta t since last issued status command. + if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) { + error!("TODO Fresh timeout send connection-close for {addr}"); + // TODO collect in metrics + // self.stats.ca_conn_status_feedback_timeout_inc(); + // TODO send shutdown to this CaConn, check that we've received + // a 'shutdown' state from it. (see below) + *v = CaConnStateValue::Shutdown { since: tsnow }; + } + } + CaConnStateValue::HadFeedback => { + // TODO check for delta t since last issued status command. + if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) { + error!("TODO HadFeedback timeout send connection-close for {addr}"); + // TODO collect in metrics + // self.stats.ca_conn_status_feedback_timeout_inc(); + *v = CaConnStateValue::Shutdown { since: tsnow }; + } + } + CaConnStateValue::Shutdown { since } => { + if tsnow.saturating_duration_since(*since) > Duration::from_millis(10000) { + // TODO collect in metrics as severe error, this would be a bug. + // self.stats.critical_error_inc(); + error!("Shutdown of CaConn failed for {addr}"); + } + } + } + } + Ok(()) + } + + async fn check_channel_states(&mut self) -> Result<(), Error> { + let (mut search_pending_count,) = self.update_channel_state_counts(); + let k = self.chan_check_next.take(); + let it = if let Some(last) = k { + trace!("check_chans start at {:?}", last); + self.channel_states.inner().range_mut(last..) + } else { + self.channel_states.inner().range_mut(..) + }; + let tsnow = SystemTime::now(); + let mut attempt_series_search = true; + for (i, (ch, st)) in it.enumerate() { + match &mut st.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { since: _ } => { + todo!() + } + ActiveChannelState::WaitForStatusSeriesId { since } => { + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > Duration::from_millis(5000) { + warn!("timeout can not get status series id for {ch:?}"); + *st2 = ActiveChannelState::Init { since: tsnow }; + } else { + // TODO + } + } + ActiveChannelState::WithStatusSeriesId { + status_series_id, + state, + } => match &mut state.inner { + WithStatusSeriesIdStateInner::UnknownAddress { since } => { + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > UNKNOWN_ADDRESS_STAY { + //info!("UnknownAddress {} {:?}", i, ch); + if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX { + search_pending_count += 1; + state.inner = WithStatusSeriesIdStateInner::SearchPending { + since: tsnow, + did_send: false, + }; + SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel); + } + } + } + WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => { + //info!("SearchPending {} {:?}", i, ch); + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > SEARCH_PENDING_TIMEOUT { + info!("Search timeout for {ch:?}"); + state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow }; + search_pending_count -= 1; + } + } + WithStatusSeriesIdStateInner::WithAddress { addr: addr_v4, state } => { + //info!("WithAddress {} {:?}", i, ch); + use WithAddressState::*; + match state { + Unassigned { assign_at } => { + // TODO do I need this case anymore? + #[cfg(DISABLED)] + if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow { + let backend = self.backend.clone(); + let addr = SocketAddr::V4(*addr_v4); + let name = ch.id().into(); + let cssid = status_series_id.clone(); + let local_epics_hostname = self.local_epics_hostname.clone(); + // This operation is meant to complete very quickly + let add = ChannelAdd { + backend: backend, + name: name, + addr, + cssid, + local_epics_hostname, + }; + self.handle_add_channel(add).await?; + let cs = ConnectionState { + updated: tsnow, + value: ConnectionStateValue::Unconnected, + }; + // TODO if a matching CaConn does not yet exist, it gets created + // via the command through the channel, so we can not await it here. + // Therefore, would be good to have a separate status entry out of + // the ca_conn_ress right here in a sync fashion. + *state = WithAddressState::Assigned(cs); + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: tsnow, + series: SeriesId::new(status_series_id.id()), + status: scywr::iteminsertqueue::ChannelStatus::AssignedToAddress, + }); + match self.storage_insert_tx.send(item).await { + Ok(_) => {} + Err(_) => { + // TODO feed into throttled log, or count as unlogged + } + } + } + } + Assigned(_) => { + // TODO check if channel is healthy and alive + } + } + } + WithStatusSeriesIdStateInner::NoAddress { since } => { + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > NO_ADDRESS_STAY { + state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow }; + } + } + }, + }, + ChannelStateValue::ToRemove { .. } => { + // TODO if assigned to some address, + } + } + if i >= CHECK_CHANS_PER_TICK { + self.chan_check_next = Some(ch.clone()); + break; + } + } + Ok(()) + } + + fn update_channel_state_counts(&mut self) -> (u64,) { + let mut unknown_address_count = 0; + let mut search_pending_count = 0; + let mut search_pending_did_send_count = 0; + let mut unassigned_count = 0; + let mut assigned_count = 0; + let mut no_address_count = 0; + for (_ch, st) in self.channel_states.inner().iter() { + match &st.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { .. } => { + unknown_address_count += 1; + } + ActiveChannelState::WaitForStatusSeriesId { .. } => { + unknown_address_count += 1; + } + ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { + WithStatusSeriesIdStateInner::UnknownAddress { .. } => { + unknown_address_count += 1; + } + WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => { + if *did_send { + search_pending_did_send_count += 1; + } else { + search_pending_count += 1; + } + } + WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { + WithAddressState::Unassigned { .. } => { + unassigned_count += 1; + } + WithAddressState::Assigned(_) => { + assigned_count += 1; + } + }, + WithStatusSeriesIdStateInner::NoAddress { .. } => { + no_address_count += 1; + } + }, + }, + ChannelStateValue::ToRemove { .. } => { + unknown_address_count += 1; + } + } + } + use atomic::Ordering::Release; + self.stats.channel_unknown_address.store(unknown_address_count, Release); + self.stats.channel_search_pending.store(search_pending_count, Release); + self.stats + .search_pending_did_send + .store(search_pending_did_send_count, Release); + self.stats.unassigned.store(unassigned_count, Release); + self.stats.assigned.store(assigned_count, Release); + self.stats.channel_no_address.store(no_address_count, Release); + (search_pending_count,) + } } diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs new file mode 100644 index 0000000..26cfc97 --- /dev/null +++ b/netfetch/src/ca/finder.rs @@ -0,0 +1,355 @@ +use super::connset::CaConnSetEvent; +use super::connset::IocAddrQuery; +use super::connset::CURRENT_SEARCH_PENDING_MAX; +use super::connset::SEARCH_BATCH_MAX; +use crate::ca::findioc::FindIocRes; +use crate::ca::findioc::FindIocStream; +use crate::daemon_common::DaemonEvent; +use async_channel::Receiver; +use async_channel::Sender; +use dbpg::conn::make_pg_client; +use dbpg::postgres::Row as PgRow; +use err::Error; +use futures_util::FutureExt; +use futures_util::StreamExt; +use log::*; +use netpod::Database; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::net::SocketAddrV4; +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; +use std::time::Duration; +use std::time::Instant; +use taskrun::tokio; +use tokio::task::JoinHandle; + +const SEARCH_DB_PIPELINE_LEN: usize = 4; +const FINDER_JOB_QUEUE_LEN_MAX: usize = 10; +const FINDER_BATCH_SIZE: usize = 8; +const FINDER_IN_FLIGHT_MAX: usize = 800; +const FINDER_TIMEOUT: Duration = Duration::from_millis(100); + +// TODO pull out into a stats +static SEARCH_REQ_BATCH_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_0_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_1_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0); + +#[allow(unused)] +macro_rules! debug_batch { + // (D$($arg:tt)*) => (); + ($($arg:tt)*) => (if false { + debug!($($arg)*); + }); +} + +#[allow(unused)] +macro_rules! trace_batch { + // (D$($arg:tt)*) => (); + ($($arg:tt)*) => (if false { + trace!($($arg)*); + }); +} + +#[derive(Debug)] +pub struct IocAddrQueryResult {} + +fn transform_pgres(rows: Vec) -> VecDeque { + let mut ret = VecDeque::new(); + for row in rows { + let ch: Result = row.try_get(0); + if let Ok(ch) = ch { + if let Some(addr) = row.get::<_, Option>(1) { + let addr = addr.parse().map_or(None, |x| Some(x)); + let item = FindIocRes { + channel: ch, + response_addr: None, + addr, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } else { + let item = FindIocRes { + channel: ch, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } + } else if let Err(e) = ch { + error!("bad string from pg: {e:?}"); + } + } + ret +} + +async fn finder_worker_single( + inp: Receiver>, + tx: Sender, + backend: String, + db: Database, +) -> Result<(), Error> { + let pg = make_pg_client(&db) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let sql = concat!( + "with q1 as (select * from unnest($2::text[]) as unn (ch))", + " select distinct on (tt.facility, tt.channel) tt.channel, tt.addr", + " from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null", + " order by tt.facility, tt.channel, tsmod desc", + ); + let qu_select_multi = pg + .prepare(sql) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let mut resdiff = 0; + loop { + match inp.recv().await { + Ok(batch) => { + SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); + let ts1 = Instant::now(); + debug_batch!("run query batch len {}", batch.len()); + let names: Vec<_> = batch.iter().map(|x| x.name.as_str()).collect(); + let qres = pg.query(&qu_select_multi, &[&backend, &names]).await; + let dt = ts1.elapsed(); + debug_batch!( + "done query batch len {}: {} {:.3}ms", + batch.len(), + qres.is_ok(), + dt.as_secs_f32() * 1e3 + ); + if dt > Duration::from_millis(5000) { + let mut out = String::from("["); + for e in &batch { + if out.len() > 1 { + out.push_str(", "); + } + out.push('\''); + out.push_str(&e.name); + out.push('\''); + } + out.push(']'); + eprintln!("VERY SLOW QUERY\n{out}"); + } + match qres { + Ok(rows) => { + if rows.len() > batch.len() { + error!("MORE RESULTS THAN INPUT"); + } else if rows.len() < batch.len() { + resdiff += batch.len() - rows.len(); + } + let nbatch = batch.len(); + trace_batch!("received results {} resdiff {}", rows.len(), resdiff); + SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); + let items = transform_pgres(rows); + let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); + let mut to_add = Vec::new(); + for e in batch { + let s = e.name; + if !names.contains_key(&s) { + let item = FindIocRes { + channel: s, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + to_add.push(item); + } + } + SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); + let mut items = items; + items.extend(to_add.into_iter()); + if items.len() != nbatch { + error!("STILL NOT MATCHING LEN"); + } + SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + let x = tx + .send(CaConnSetEvent::ConnSetCmd( + crate::ca::connset::ConnSetCmd::IocAddrQueryResult(items), + )) + .await; + match x { + Ok(_) => {} + Err(e) => { + error!("finder sees: {e}"); + break; + } + } + } + Err(e) => { + error!("finder sees error: {e}"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + } + Err(_e) => break, + } + } + Ok(()) +} + +async fn finder_worker( + qrx: Receiver, + tx: Sender, + backend: String, + db: Database, +) -> Result<(), Error> { + // TODO do something with join handle + let (batch_rx, _jh) = batchtools::batcher::batch( + SEARCH_BATCH_MAX, + Duration::from_millis(200), + SEARCH_DB_PIPELINE_LEN, + qrx, + ); + for _ in 0..SEARCH_DB_PIPELINE_LEN { + // TODO use join handle + tokio::spawn(finder_worker_single( + batch_rx.clone(), + tx.clone(), + backend.clone(), + db.clone(), + )); + } + Ok(()) +} + +pub fn start_finder( + tx: Sender, + backend: String, + db: Database, +) -> (Sender, JoinHandle>) { + let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); + let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db)); + (qtx, jh) +} + +struct OptFut { + fut: Option, +} + +impl OptFut { + fn empty() -> Self { + Self { fut: None } + } + + fn new(fut: F) -> Self { + Self { fut: Some(fut) } + } + + fn is_enabled(&self) -> bool { + self.fut.is_some() + } +} + +impl futures_util::Future for OptFut +where + F: futures_util::Future + std::marker::Unpin, +{ + type Output = ::Output; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { + match self.fut.as_mut() { + Some(fut) => fut.poll_unpin(cx), + None => std::task::Poll::Pending, + } + } +} + +#[allow(unused)] +fn start_finder_ca(tx: Sender, tgts: Vec) -> (Sender, JoinHandle<()>) { + let (qtx, qrx) = async_channel::bounded(32); + let (atx, arx) = async_channel::bounded(32); + let ioc_finder_fut = async move { + let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE); + let fut_tick_dur = Duration::from_millis(100); + let mut finder_more = true; + let mut finder_fut = OptFut::new(finder.next()); + let mut qrx_fut = OptFut::new(qrx.recv()); + let mut qrx_more = true; + let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + let mut asend = OptFut::empty(); + loop { + tokio::select! { + _ = &mut asend, if asend.is_enabled() => { + asend = OptFut::empty(); + } + r1 = &mut finder_fut, if finder_fut.is_enabled() => { + finder_fut = OptFut::empty(); + match r1 { + Some(item) => { + asend = OptFut::new(atx.send(item)); + } + None => { + // TODO finder has stopped, do no longer poll on it + warn!("Finder has stopped"); + finder_more = false; + } + } + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + qrx_fut = OptFut::new(qrx.recv()); + } + if finder_more { + finder_fut = OptFut::new(finder.next()); + } + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + } + r2 = &mut qrx_fut, if qrx_fut.is_enabled() => { + qrx_fut = OptFut::empty(); + match r2 { + Ok(item) => { + finder.push(item); + } + Err(e) => { + // TODO input is done... ignore from here on. + error!("Finder input channel error {e}"); + qrx_more = false; + } + } + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + qrx_fut = OptFut::new(qrx.recv()); + } + if finder_more { + finder_fut = OptFut::new(finder.next()); + } else { + finder_fut = OptFut::empty(); + } + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + } + _ = &mut fut_tick => { + if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { + qrx_fut = OptFut::new(qrx.recv()); + } + if finder_more { + finder_fut = OptFut::new(finder.next()); + } else { + finder_fut = OptFut::empty(); + } + fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); + } + else => { + error!("all branches are disabled"); + break; + } + }; + } + }; + let ioc_finder_jh = taskrun::spawn(ioc_finder_fut); + taskrun::spawn({ + async move { + while let Ok(item) = arx.recv().await { + match tx.send(DaemonEvent::SearchDone(item)).await { + Ok(_) => {} + Err(e) => { + error!("search res fwd {e}"); + } + } + } + warn!("search res fwd nput broken"); + } + }); + (qtx, ioc_finder_jh) +} diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs new file mode 100644 index 0000000..f1aca41 --- /dev/null +++ b/netfetch/src/ca/statemap.rs @@ -0,0 +1,123 @@ +use crate::daemon_common::Channel; +use async_channel::Receiver; +use serde::Serialize; +use series::series::Existence; +use series::ChannelStatusSeriesId; +use series::SeriesId; +use std::collections::BTreeMap; +use std::net::SocketAddrV4; +use std::time::Instant; +use std::time::SystemTime; + +pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1; + +#[derive(Debug)] +pub enum CaConnStateValue { + Fresh, + HadFeedback, + Shutdown { since: Instant }, +} + +#[derive(Debug)] +pub struct CaConnState { + pub last_feedback: Instant, + pub value: CaConnStateValue, +} + +impl CaConnState { + pub fn new(value: CaConnStateValue) -> Self { + Self { + last_feedback: Instant::now(), + value, + } + } +} + +#[derive(Clone, Debug, Serialize)] +pub enum ConnectionStateValue { + Unconnected, + Connected { + //#[serde(with = "serde_Instant")] + since: SystemTime, + }, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ConnectionState { + //#[serde(with = "serde_Instant")] + pub updated: SystemTime, + pub value: ConnectionStateValue, +} + +#[derive(Clone, Debug, Serialize)] +pub enum WithAddressState { + Unassigned { + //#[serde(with = "serde_Instant")] + assign_at: SystemTime, + }, + Assigned(ConnectionState), +} + +#[derive(Clone, Debug, Serialize)] +pub enum WithStatusSeriesIdStateInner { + UnknownAddress { + since: SystemTime, + }, + SearchPending { + //#[serde(with = "serde_Instant")] + since: SystemTime, + did_send: bool, + }, + WithAddress { + addr: SocketAddrV4, + state: WithAddressState, + }, + NoAddress { + since: SystemTime, + }, +} + +#[derive(Clone, Debug, Serialize)] +pub struct WithStatusSeriesIdState { + pub inner: WithStatusSeriesIdStateInner, +} + +#[derive(Clone, Debug)] +pub enum ActiveChannelState { + Init { + since: SystemTime, + }, + WaitForStatusSeriesId { + since: SystemTime, + }, + WithStatusSeriesId { + status_series_id: ChannelStatusSeriesId, + state: WithStatusSeriesIdState, + }, +} + +#[derive(Debug)] +pub enum ChannelStateValue { + Active(ActiveChannelState), + ToRemove { addr: Option }, +} + +#[derive(Debug)] +pub struct ChannelState { + pub value: ChannelStateValue, +} + +#[derive(Debug)] +pub struct ChannelStateMap { + map: BTreeMap, +} + +impl ChannelStateMap { + pub fn new() -> Self { + Self { map: BTreeMap::new() } + } + + pub fn inner(&mut self) -> &mut BTreeMap { + &mut self.map + } +} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 2ccdabb..5253efe 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -207,6 +207,23 @@ impl IntervalEma { } } +stats_proc::stats_struct!(( + stats_struct( + name(CaConnSetStats), + counters( + channel_unknown_address, + channel_with_address, + channel_search_pending, + search_pending_did_send, + channel_no_address, + unassigned, + assigned, + ), + ), + // agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)), + diff(name(CaConnSetStatsDiff), input(CaConnSetStats)), +)); + stats_proc::stats_struct!(( stats_struct( name(CaConnStats),