From 7a10a740f62b5418120e229d3602258a0b1b3431 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 14 Feb 2023 16:31:48 +0100 Subject: [PATCH] Pass channel status series id --- daqingest/src/daemon.rs | 526 ++++++++++++------- netfetch/src/batchquery/series_by_channel.rs | 45 +- netfetch/src/ca/conn.rs | 78 ++- netfetch/src/ca/connset.rs | 13 +- netfetch/src/series.rs | 13 + netfetch/src/store.rs | 9 +- 6 files changed, 447 insertions(+), 237 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 991b229..5d0f6df 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -5,6 +5,7 @@ use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; +use netfetch::batchquery::series_by_channel::ChannelInfoQuery; use netfetch::ca::conn::CaConnEvent; use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; @@ -20,6 +21,11 @@ use netfetch::errconv::ErrConv; use netfetch::insertworker::Ttls; use netfetch::metrics::ExtraInsertsConf; use netfetch::metrics::StatsSet; +use netfetch::series::ChannelStatusSeriesId; +use netfetch::series::Existence; +use netfetch::series::SeriesId; +use netfetch::store::ChannelStatus; +use netfetch::store::ChannelStatusItem; use netfetch::store::CommonInsertItemQueue; use netfetch::store::ConnectionStatus; use netfetch::store::ConnectionStatusItem; @@ -52,6 +58,7 @@ const FINDER_IN_FLIGHT_MAX: usize = 800; const FINDER_BATCH_SIZE: usize = 8; const CHECK_CHANS_PER_TICK: usize = 10000; const CA_CONN_INSERT_QUEUE_MAX: usize = 256; +const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1; const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000); const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); @@ -131,7 +138,7 @@ pub enum WithAddressState { } #[derive(Clone, Debug, Serialize)] -pub enum ActiveChannelState { +pub enum WithStatusSeriesIdStateInner { UnknownAddress { since: SystemTime, }, @@ -149,6 +156,26 @@ pub enum ActiveChannelState { }, } +#[derive(Clone, Debug, Serialize)] +pub struct WithStatusSeriesIdState { + inner: WithStatusSeriesIdStateInner, +} + +#[derive(Clone, Debug)] +pub enum ActiveChannelState { + Init { + since: SystemTime, + }, + WaitForStatusSeriesId { + since: SystemTime, + rx: Receiver, Error>>, + }, + WithStatusSeriesId { + status_series_id: ChannelStatusSeriesId, + state: WithStatusSeriesIdState, + }, +} + #[derive(Debug)] pub enum ChannelStateValue { Active(ActiveChannelState), @@ -259,6 +286,7 @@ pub struct Daemon { stats: Arc, shutting_down: bool, insert_rx_weak: WeakReceiver, + channel_info_query_tx: Sender, } impl Daemon { @@ -428,7 +456,7 @@ impl Daemon { } }; // TODO await on shutdown - let jh = tokio::spawn(fut); + let _jh = tokio::spawn(fut); //let mut jhs = Vec::new(); //jhs.push(jh); //futures_util::future::join_all(jhs).await; @@ -460,6 +488,7 @@ impl Daemon { stats: Arc::new(DaemonStats::new()), shutting_down: false, insert_rx_weak, + channel_info_query_tx, }; Ok(ret) } @@ -758,26 +787,34 @@ impl Daemon { for (_ch, st) in &self.channel_states { match &st.value { ChannelStateValue::Active(st2) => match st2 { - ActiveChannelState::UnknownAddress { since: _ } => { + ActiveChannelState::Init { .. } => { without_address_count += 1; } - ActiveChannelState::SearchPending { since: _, did_send: _ } => { - currently_search_pending += 1; + ActiveChannelState::WaitForStatusSeriesId { .. } => { without_address_count += 1; } - ActiveChannelState::WithAddress { addr: _, state } => match state { - WithAddressState::Unassigned { assign_at: _ } => { - with_address_count += 1; + ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { + WithStatusSeriesIdStateInner::UnknownAddress { .. } => { + without_address_count += 1; } - WithAddressState::Assigned(_) => { - with_address_count += 1; + WithStatusSeriesIdStateInner::SearchPending { .. } => { + currently_search_pending += 1; + without_address_count += 1; + } + WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { + WithAddressState::Unassigned { .. } => { + with_address_count += 1; + } + WithAddressState::Assigned(_) => { + with_address_count += 1; + } + }, + WithStatusSeriesIdStateInner::NoAddress { .. } => { + without_address_count += 1; } }, - ActiveChannelState::NoAddress { since: _ } => { - without_address_count += 1; - } }, - ChannelStateValue::ToRemove { addr: _ } => { + ChannelStateValue::ToRemove { .. } => { with_address_count += 1; } } @@ -797,85 +834,151 @@ impl Daemon { self.channel_states.range_mut(..) }; let tsnow = SystemTime::now(); + let mut attempt_series_search = true; for (i, (ch, st)) in it.enumerate() { - use ActiveChannelState::*; - use ChannelStateValue::*; match &mut st.value { - Active(st2) => match st2 { - UnknownAddress { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > UNKNOWN_ADDRESS_STAY { - //info!("UnknownAddress {} {:?}", i, ch); - if currently_search_pending < CURRENT_SEARCH_PENDING_MAX { - currently_search_pending += 1; - st.value = Active(SearchPending { - since: tsnow, - did_send: false, - }); - SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel); + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { since: _ } => { + let (tx, rx) = async_channel::bounded(1); + let q = ChannelInfoQuery { + backend: self.ingest_commons.backend.clone(), + channel: ch.id().into(), + scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, + shape_dims: Vec::new(), + tx, + }; + if attempt_series_search { + match self.channel_info_query_tx.try_send(q) { + Ok(()) => { + *st2 = ActiveChannelState::WaitForStatusSeriesId { since: tsnow, rx }; + } + Err(e) => match e { + _ => { + attempt_series_search = false; + } + }, } } } - SearchPending { since, did_send: _ } => { - //info!("SearchPending {} {:?}", i, ch); + ActiveChannelState::WaitForStatusSeriesId { since, rx } => { let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > SEARCH_PENDING_TIMEOUT { - info!("Search timeout for {ch:?}"); - st.value = Active(ActiveChannelState::NoAddress { since: tsnow }); - currently_search_pending -= 1; + if dt > Duration::from_millis(5000) { + warn!("timeout can not get status series id"); + *st2 = ActiveChannelState::Init { since: tsnow }; + } else { + match rx.try_recv() { + Ok(x) => match x { + Ok(x) => { + //info!("received status series id: {x:?}"); + *st2 = ActiveChannelState::WithStatusSeriesId { + status_series_id: ChannelStatusSeriesId::new(x.into_inner().id()), + state: WithStatusSeriesIdState { + inner: WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow }, + }, + }; + } + Err(e) => { + error!("could not get a status series id"); + } + }, + Err(e) => {} + } } } - WithAddress { addr, state } => { - //info!("WithAddress {} {:?}", i, ch); - use WithAddressState::*; - match state { - Unassigned { assign_at } => { - if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow { - let backend = self.opts.backend().into(); - let channel_name = ch.id().into(); - // This operation is meant to complete very quickly - self.ingest_commons - .ca_conn_set - .add_channel_to_addr( - backend, - *addr, - channel_name, - &self.common_insert_item_queue, - &self.datastore, - CA_CONN_INSERT_QUEUE_MAX, - self.opts.array_truncate, - self.opts.local_epics_hostname.clone(), - ) - .slow_warn(2000) - .instrument(info_span!("add_channel_to_addr")) - .await?; - let cs = ConnectionState { - updated: tsnow, - value: ConnectionStateValue::Unconnected, + ActiveChannelState::WithStatusSeriesId { + status_series_id, + state, + } => match &mut state.inner { + WithStatusSeriesIdStateInner::UnknownAddress { since } => { + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > UNKNOWN_ADDRESS_STAY { + //info!("UnknownAddress {} {:?}", i, ch); + if currently_search_pending < CURRENT_SEARCH_PENDING_MAX { + currently_search_pending += 1; + state.inner = WithStatusSeriesIdStateInner::SearchPending { + since: tsnow, + did_send: false, }; - *state = WithAddressState::Assigned(cs); - self.connection_states.entry(*addr).or_insert_with(|| { - let t = CaConnState { - last_feedback: Instant::now(), - value: CaConnStateValue::Fresh, - }; - t - }); + SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel); } } - Assigned(_) => { - // TODO check if channel is healthy and alive + } + WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => { + //info!("SearchPending {} {:?}", i, ch); + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > SEARCH_PENDING_TIMEOUT { + info!("Search timeout for {ch:?}"); + state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow }; + currently_search_pending -= 1; } } - } - NoAddress { since } => { - let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); - if dt > NO_ADDRESS_STAY { - st.value = Active(ActiveChannelState::UnknownAddress { since: tsnow }); + WithStatusSeriesIdStateInner::WithAddress { addr, state } => { + //info!("WithAddress {} {:?}", i, ch); + use WithAddressState::*; + match state { + Unassigned { assign_at } => { + if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow { + let backend = self.opts.backend().into(); + let channel_name = ch.id().into(); + // This operation is meant to complete very quickly + self.ingest_commons + .ca_conn_set + .add_channel_to_addr( + backend, + *addr, + channel_name, + status_series_id.clone(), + &self.common_insert_item_queue, + &self.datastore, + CA_CONN_INSERT_QUEUE_MAX, + self.opts.array_truncate, + self.opts.local_epics_hostname.clone(), + ) + .slow_warn(2000) + .instrument(info_span!("add_channel_to_addr")) + .await?; + let cs = ConnectionState { + updated: tsnow, + value: ConnectionStateValue::Unconnected, + }; + *state = WithAddressState::Assigned(cs); + self.connection_states.entry(*addr).or_insert_with(|| { + let t = CaConnState { + last_feedback: Instant::now(), + value: CaConnStateValue::Fresh, + }; + t + }); + // TODO move await out of here + if let Some(tx) = self.ingest_commons.insert_item_queue.sender() { + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: tsnow, + series: SeriesId::new(status_series_id.id()), + status: ChannelStatus::AssignedToAddress, + }); + match tx.send(item).await { + Ok(_) => {} + Err(_) => { + // TODO feed into throttled log, or count as unlogged + } + } + } + } + } + Assigned(_) => { + // TODO check if channel is healthy and alive + } + } } - } + WithStatusSeriesIdStateInner::NoAddress { since } => { + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > NO_ADDRESS_STAY { + state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow }; + } + } + }, }, - ToRemove { .. } => { + ChannelStateValue::ToRemove { .. } => { // TODO if assigned to some address, } } @@ -885,21 +988,27 @@ impl Daemon { } } for (ch, st) in &mut self.channel_states { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since: _, did_send }) = &mut st.value { - if *did_send == false { - match self.search_tx.try_send(ch.id().into()) { - Ok(()) => { - *did_send = true; - SEARCH_REQ_SEND_COUNT.fetch_add(1, atomic::Ordering::AcqRel); - } - Err(e) => match e { - async_channel::TrySendError::Full(_) => {} - async_channel::TrySendError::Closed(_) => { - error!("Finder channel closed"); - // TODO recover from this. - panic!(); + if let ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state, + }) = &mut st.value + { + if let WithStatusSeriesIdStateInner::SearchPending { since: _, did_send } = &mut state.inner { + if *did_send == false { + match self.search_tx.try_send(ch.id().into()) { + Ok(()) => { + *did_send = true; + SEARCH_REQ_SEND_COUNT.fetch_add(1, atomic::Ordering::AcqRel); } - }, + Err(e) => match e { + async_channel::TrySendError::Full(_) => {} + async_channel::TrySendError::Closed(_) => { + error!("Finder channel closed"); + // TODO recover from this. + panic!(); + } + }, + } } } } @@ -914,26 +1023,30 @@ impl Daemon { for (_ch, st) in &self.channel_states { match &st.value { ChannelStateValue::Active(st) => match st { - ActiveChannelState::UnknownAddress { .. } => { - self.count_unknown_address += 1; - } - ActiveChannelState::SearchPending { did_send, .. } => { - self.count_search_pending += 1; - if *did_send { - self.count_search_sent += 1; + ActiveChannelState::Init { .. } => {} + ActiveChannelState::WaitForStatusSeriesId { .. } => {} + ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner { + WithStatusSeriesIdStateInner::UnknownAddress { .. } => { + self.count_unknown_address += 1; } - } - ActiveChannelState::WithAddress { state, .. } => match state { - WithAddressState::Unassigned { .. } => { - self.count_unassigned += 1; + WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => { + self.count_search_pending += 1; + if *did_send { + self.count_search_sent += 1; + } } - WithAddressState::Assigned(_) => { - self.count_assigned += 1; + WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state { + WithAddressState::Unassigned { .. } => { + self.count_unassigned += 1; + } + WithAddressState::Assigned(_) => { + self.count_assigned += 1; + } + }, + WithStatusSeriesIdStateInner::NoAddress { .. } => { + self.count_no_address += 1; } }, - ActiveChannelState::NoAddress { .. } => { - self.count_no_address += 1; - } }, ChannelStateValue::ToRemove { .. } => {} } @@ -1037,7 +1150,7 @@ impl Daemon { fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> { if !self.channel_states.contains_key(&ch) { let st = ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress { + value: ChannelStateValue::Active(ActiveChannelState::Init { since: SystemTime::now(), }), }; @@ -1050,20 +1163,31 @@ impl Daemon { if let Some(k) = self.channel_states.get_mut(&ch) { match &k.value { ChannelStateValue::Active(j) => match j { - ActiveChannelState::UnknownAddress { .. } => { + ActiveChannelState::Init { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } - ActiveChannelState::SearchPending { .. } => { - k.value = ChannelStateValue::ToRemove { addr: None }; - } - ActiveChannelState::WithAddress { addr, .. } => { - k.value = ChannelStateValue::ToRemove { - addr: Some(addr.clone()), - }; - } - ActiveChannelState::NoAddress { .. } => { + ActiveChannelState::WaitForStatusSeriesId { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } + ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state, + } => match state.inner { + WithStatusSeriesIdStateInner::UnknownAddress { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + WithStatusSeriesIdStateInner::SearchPending { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + WithStatusSeriesIdStateInner::WithAddress { addr, .. } => { + k.value = ChannelStateValue::ToRemove { + addr: Some(addr.clone()), + }; + } + WithStatusSeriesIdStateInner::NoAddress { .. } => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } + }, }, ChannelStateValue::ToRemove { .. } => {} } @@ -1073,6 +1197,7 @@ impl Daemon { async fn handle_search_done(&mut self, item: Result, Error>) -> Result<(), Error> { //debug!("handle SearchDone: {res:?}"); + let allow_create_new_connections = self.allow_create_new_connections(); let tsnow = SystemTime::now(); match item { Ok(ress) => { @@ -1080,56 +1205,96 @@ impl Daemon { for res in ress { if let Some(addr) = &res.addr { self.stats.ioc_search_some_inc(); - if self.allow_create_new_connections() { - let ch = Channel::new(res.channel); - if let Some(st) = self.channel_states.get_mut(&ch) { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { - since, - did_send: _, - }) = &st.value - { - let dt = tsnow.duration_since(*since).unwrap(); - if dt > SEARCH_PENDING_TIMEOUT_WARN { - warn!( - " FOUND {:5.0} {:5.0} {addr}", - 1e3 * dt.as_secs_f32(), - 1e3 * res.dt.as_secs_f32() - ); - } - let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress { - addr: addr.clone(), - state: WithAddressState::Unassigned { assign_at: tsnow }, - }); - st.value = stnew; - } else { - warn!( - "address found, but state for {ch:?} is not SearchPending: {:?}", - st.value - ); - } - } else { - warn!("can not find channel state for {ch:?}"); + + let ch = Channel::new(res.channel); + if let Some(st) = self.channel_states.get_mut(&ch) { + match &st.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { .. } => {} + ActiveChannelState::WaitForStatusSeriesId { .. } => {} + ActiveChannelState::WithStatusSeriesId { + status_series_id, + state, + } => match state.inner { + WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => { + if allow_create_new_connections { + let dt = tsnow.duration_since(since).unwrap(); + if dt > SEARCH_PENDING_TIMEOUT_WARN { + warn!( + " FOUND {:5.0} {:5.0} {addr}", + 1e3 * dt.as_secs_f32(), + 1e3 * res.dt.as_secs_f32() + ); + } + let stnew = + ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { + status_series_id: status_series_id.clone(), + state: WithStatusSeriesIdState { + inner: WithStatusSeriesIdStateInner::WithAddress { + addr: addr.clone(), + state: WithAddressState::Unassigned { + assign_at: tsnow, + }, + }, + }, + }); + st.value = stnew; + } else { + // Emit something here? + } + } + _ => { + warn!( + "address found, but state for {ch:?} is not SearchPending: {:?}", + st.value + ); + } + }, + }, + ChannelStateValue::ToRemove { addr: _ } => {} } } else { - // Emit something here? + warn!("can not find channel state for {ch:?}"); } } else { //debug!("no addr from search in {res:?}"); let ch = Channel::new(res.channel); if let Some(st) = self.channel_states.get_mut(&ch) { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since, did_send: _ }) = - &st.value - { - let dt = tsnow.duration_since(*since).unwrap(); - if dt > SEARCH_PENDING_TIMEOUT_WARN { - warn!( - "NOT FOUND {:5.0} {:5.0}", - 1e3 * dt.as_secs_f32(), - 1e3 * res.dt.as_secs_f32() - ); - } - st.value = ChannelStateValue::Active(ActiveChannelState::NoAddress { since: tsnow }); - } else { + let mut unexpected_state = true; + match &st.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { .. } => {} + ActiveChannelState::WaitForStatusSeriesId { .. } => {} + ActiveChannelState::WithStatusSeriesId { + status_series_id, + state: st3, + } => match &st3.inner { + WithStatusSeriesIdStateInner::UnknownAddress { .. } => {} + WithStatusSeriesIdStateInner::SearchPending { since, .. } => { + unexpected_state = false; + let dt = tsnow.duration_since(*since).unwrap(); + if dt > SEARCH_PENDING_TIMEOUT_WARN { + warn!( + "NOT FOUND {:5.0} {:5.0}", + 1e3 * dt.as_secs_f32(), + 1e3 * res.dt.as_secs_f32() + ); + } + st.value = + ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { + status_series_id: status_series_id.clone(), + state: WithStatusSeriesIdState { + inner: WithStatusSeriesIdStateInner::NoAddress { since: tsnow }, + }, + }); + } + WithStatusSeriesIdStateInner::WithAddress { .. } => {} + WithStatusSeriesIdStateInner::NoAddress { .. } => {} + }, + }, + ChannelStateValue::ToRemove { .. } => {} + } + if unexpected_state { warn!("no address, but state for {ch:?} is not SearchPending: {:?}", st.value); } } else { @@ -1152,22 +1317,29 @@ impl Daemon { for (_k, v) in self.channel_states.iter_mut() { match &v.value { ChannelStateValue::Active(st2) => match st2 { - ActiveChannelState::UnknownAddress { .. } => {} - ActiveChannelState::SearchPending { since: _, did_send: _ } => {} - ActiveChannelState::WithAddress { addr, state: _ } => { - if addr == &conn_addr { - // TODO reset channel, emit log event for the connection addr only - //info!("ca conn down, reset {k:?}"); - *v = ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress { - since: SystemTime::now(), - }), - }; + ActiveChannelState::WithStatusSeriesId { + status_series_id: _, + state: st3, + } => match &st3.inner { + WithStatusSeriesIdStateInner::UnknownAddress { .. } => {} + WithStatusSeriesIdStateInner::SearchPending { .. } => {} + WithStatusSeriesIdStateInner::WithAddress { addr, .. } => { + if addr == &conn_addr { + // TODO reset channel, emit log event for the connection addr only + //info!("ca conn down, reset {k:?}"); + *v = ChannelState { + value: ChannelStateValue::Active(ActiveChannelState::Init { + since: SystemTime::now(), + }), + }; + } } - } - ActiveChannelState::NoAddress { .. } => {} + WithStatusSeriesIdStateInner::NoAddress { .. } => {} + }, + ActiveChannelState::Init { .. } => {} + ActiveChannelState::WaitForStatusSeriesId { .. } => {} }, - ChannelStateValue::ToRemove { addr: _ } => {} + ChannelStateValue::ToRemove { .. } => {} } } let item = QueryItem::ConnectionStatus(ConnectionStatusItem { @@ -1176,7 +1348,7 @@ impl Daemon { status: ConnectionStatus::ConnectionHandlerDone, }); if let Some(tx) = self.ingest_commons.insert_item_queue.sender() { - if let Err(e) = tokio::time::timeout(Duration::from_millis(1000), tx.send(item)).await { + if let Err(_) = tokio::time::timeout(Duration::from_millis(1000), tx.send(item)).await { error!("timeout on insert queue send"); } else { } diff --git a/netfetch/src/batchquery/series_by_channel.rs b/netfetch/src/batchquery/series_by_channel.rs index d184208..87cf59e 100644 --- a/netfetch/src/batchquery/series_by_channel.rs +++ b/netfetch/src/batchquery/series_by_channel.rs @@ -29,7 +29,7 @@ impl ChannelInfoQuery { Self { backend: String::new(), channel: String::new(), - scalar_type: 4242, + scalar_type: -1, shape_dims: Vec::new(), tx: self.tx.clone(), } @@ -37,9 +37,8 @@ impl ChannelInfoQuery { } struct ChannelInfoResult { - series: Vec>, - tx: Vec, Error>>>, - missing: Vec, + series: Existence, + tx: Sender, Error>>, } struct PgRes { @@ -63,7 +62,10 @@ async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender

, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> { +async fn select( + batch: Vec, + pgres: PgRes, +) -> Result<(Vec, Vec, PgRes), Error> { let mut backend = Vec::new(); let mut channel = Vec::new(); let mut scalar_type = Vec::new(); @@ -99,8 +101,7 @@ async fn select(batch: Vec, pgres: PgRes) -> Result<(ChannelIn Error::from(e.to_string()) }) { Ok(rows) => { - let mut series_ids = Vec::new(); - let mut txs = Vec::new(); + let mut result = Vec::new(); let mut missing = Vec::new(); let mut it1 = rows.into_iter(); let mut e1 = it1.next(); @@ -110,8 +111,11 @@ async fn select(batch: Vec, pgres: PgRes) -> Result<(ChannelIn if rid as u32 == qrid { let series: i64 = row.get(0); let series = SeriesId::new(series as _); - series_ids.push(Existence::Existing(series)); - txs.push(tx); + let res = ChannelInfoResult { + series: Existence::Existing(series), + tx, + }; + result.push(res); } e1 = it1.next(); } else { @@ -126,12 +130,7 @@ async fn select(batch: Vec, pgres: PgRes) -> Result<(ChannelIn missing.push(k); } } - let result = ChannelInfoResult { - series: series_ids, - tx: txs, - missing, - }; - Ok((result, pgres)) + Ok((result, missing, pgres)) } Err(e) => { error!("error in pg query {e}"); @@ -219,12 +218,12 @@ async fn insert_missing(batch: &Vec, pgres: PgRes) -> Result<( Ok(((), pgres)) } -async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> { - let (res1, pgres) = select(batch, pgres).await?; - if res1.missing.len() > 0 { - let ((), pgres) = insert_missing(&res1.missing, pgres).await?; - let (res2, pgres) = select(res1.missing, pgres).await?; - if res2.missing.len() > 0 { +async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(Vec, PgRes), Error> { + let (res1, missing, pgres) = select(batch, pgres).await?; + if missing.len() > 0 { + let ((), pgres) = insert_missing(&missing, pgres).await?; + let (res2, missing2, pgres) = select(missing, pgres).await?; + if missing2.len() > 0 { Err(Error::with_msg_no_trace("some series not found even after write")) } else { Ok((res2, pgres)) @@ -264,8 +263,8 @@ async fn run_queries( while let Some(item) = stream.next().await { match item { Ok(res) => { - for (sid, tx) in res.series.into_iter().zip(res.tx) { - match tx.send(Ok(sid)).await { + for r in res { + match r.tx.send(Ok(r.series)).await { Ok(_) => {} Err(e) => { // TODO count cases, but no log. Client may no longer be interested in this result. diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 4deb913..d574af8 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -9,6 +9,7 @@ use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::CreateChan; use crate::ca::proto::EventAdd; +use crate::series::ChannelStatusSeriesId; use crate::series::Existence; use crate::series::SeriesId; use crate::store::ChannelInfoItem; @@ -132,6 +133,7 @@ enum MonitoringState { #[derive(Clone, Debug)] struct CreatedState { + cssid: ChannelStatusSeriesId, #[allow(unused)] cid: Cid, #[allow(unused)] @@ -156,8 +158,12 @@ struct CreatedState { #[allow(unused)] #[derive(Clone, Debug)] enum ChannelState { - Init, - Creating { cid: Cid, ts_beg: Instant }, + Init(ChannelStatusSeriesId), + Creating { + cssid: ChannelStatusSeriesId, + cid: Cid, + ts_beg: Instant, + }, Created(CreatedState), Error(ChannelError), Ended, @@ -166,7 +172,7 @@ enum ChannelState { impl ChannelState { fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo { let channel_connected_info = match self { - ChannelState::Init => ChannelConnectedInfo::Disconnected, + ChannelState::Init(_) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, ChannelState::Created(_) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, @@ -222,11 +228,6 @@ impl ChannelState { } } -#[allow(unused)] -struct ChannelsStates { - channels: BTreeMap, -} - enum CaConnState { Unconnected, Connecting( @@ -284,7 +285,7 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 { #[derive(Debug)] pub enum ConnCommandKind { - ChannelAdd(String), + ChannelAdd(String, ChannelStatusSeriesId), ChannelRemove(String), CheckHealth, Shutdown, @@ -297,10 +298,10 @@ pub struct ConnCommand { } impl ConnCommand { - pub fn channel_add(name: String) -> Self { + pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self { Self { id: Self::make_id(), - kind: ConnCommandKind::ChannelAdd(name), + kind: ConnCommandKind::ChannelAdd(name, cssid), } } @@ -362,7 +363,7 @@ pub struct CaConnEvent { #[derive(Debug)] pub enum ChannelSetOp { - Add, + Add(ChannelStatusSeriesId), Remove, } @@ -570,8 +571,8 @@ impl CaConn { // TODO return the result } - fn cmd_channel_add(&mut self, name: String) { - self.channel_add(name); + fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) { + self.channel_add(name, cssid); // TODO return the result //self.stats.caconn_command_can_not_reply_inc(); } @@ -583,7 +584,7 @@ impl CaConn { } fn cmd_shutdown(&mut self) { - self.trigger_shutdown(ChannelStatusClosedReason::IngestExit); + self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand); } fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) { @@ -603,8 +604,8 @@ impl CaConn { self.stats.caconn_loop3_count_inc(); match self.conn_command_rx.poll_next_unpin(cx) { Ready(Some(a)) => match a.kind { - ConnCommandKind::ChannelAdd(name) => { - self.cmd_channel_add(name); + ConnCommandKind::ChannelAdd(name, cssid) => { + self.cmd_channel_add(name, cssid); Ready(Some(Ok(()))) } ConnCommandKind::ChannelRemove(name) => { @@ -634,6 +635,7 @@ impl CaConn { fn channel_add_expl( channel: String, + cssid: ChannelStatusSeriesId, channels: &mut BTreeMap, cid_by_name: &mut BTreeMap, name_by_cid: &mut BTreeMap, @@ -647,15 +649,16 @@ impl CaConn { if channels.contains_key(&cid) { error!("logic error"); } else { - channels.insert(cid, ChannelState::Init); + channels.insert(cid, ChannelState::Init(cssid)); // TODO do not count, use separate queue for those channels. *init_state_count += 1; } } - pub fn channel_add(&mut self, channel: String) { + pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) { Self::channel_add_expl( channel, + cssid, &mut self.channels, &mut self.cid_by_name, &mut self.name_by_cid, @@ -726,7 +729,7 @@ impl CaConn { let mut warn_max = 0; for (_cid, chst) in &mut self.channels { match chst { - ChannelState::Init => { + ChannelState::Init(cssid) => { *chst = ChannelState::Ended; } ChannelState::Creating { .. } => { @@ -828,7 +831,7 @@ impl CaConn { let mut not_alive_count = 0; for (_, st) in &self.channels { match st { - ChannelState::Creating { cid, ts_beg } => { + ChannelState::Creating { cid, ts_beg, cssid: _ } => { if false && tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) { let name = self.name_by_cid.get(cid); // TODO channel create timed out how to let daemon know? @@ -861,10 +864,14 @@ impl CaConn { let timenow = SystemTime::now(); for (_, st) in &mut self.channels { match st { - ChannelState::Init => { + ChannelState::Init(cssid) => { // TODO need last-save-ts for this state. } - ChannelState::Creating { cid: _, ts_beg: _ } => { + ChannelState::Creating { + cid: _, + ts_beg: _, + cssid: _, + } => { // TODO need last-save-ts for this state. } ChannelState::Created(st) => { @@ -930,7 +937,15 @@ impl CaConn { proto.push_out(msg); // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); + let cssid = match ch_s { + ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(), + _ => { + let e = Error::with_msg_no_trace("channel_to_evented bad state"); + return Err(e); + } + }; *ch_s = ChannelState::Created(CreatedState { + cssid, cid, sid, // TODO handle error better! Transition channel to Error state? @@ -1289,12 +1304,13 @@ impl CaConn { let keys: Vec = self.channels.keys().map(|x| *x).collect(); for cid in keys { match self.channels.get_mut(&cid).unwrap() { - ChannelState::Init => { + ChannelState::Init(cssid) => { + let cssid = cssid.clone(); let name = self .name_by_cid(cid) .ok_or_else(|| Error::with_msg_no_trace("name for cid not known")); let name = match name { - Ok(k) => k, + Ok(k) => k.to_string(), Err(e) => return Err(e), }; let msg = CaMsg { @@ -1307,6 +1323,7 @@ impl CaConn { // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); *ch_s = ChannelState::Creating { + cssid, cid, ts_beg: Instant::now(), }; @@ -1371,7 +1388,15 @@ impl CaConn { let shape = Shape::from_ca_count(k.data_count)?; // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); + let cssid = match ch_s { + ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(), + _ => { + let e = Error::with_msg_no_trace("channel_to_evented bad state"); + return Ready(Some(Err(e))); + } + }; *ch_s = ChannelState::Created(CreatedState { + cssid, cid, sid, scalar_type: scalar_type.clone(), @@ -1631,8 +1656,9 @@ impl CaConn { let map = std::mem::replace(&mut *g, BTreeMap::new()); for (ch, op) in map { match op { - ChannelSetOp::Add => Self::channel_add_expl( + ChannelSetOp::Add(cssid) => Self::channel_add_expl( ch, + cssid, res.channels, res.cid_by_name, res.name_by_cid, diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 9d21a8a..9b197e5 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -9,6 +9,7 @@ use crate::ca::conn::CaConnEventValue; use crate::errconv::ErrConv; use crate::rt::JoinHandle; use crate::rt::TokMx; +use crate::series::ChannelStatusSeriesId; use crate::store::CommonInsertItemQueue; use crate::store::CommonInsertItemQueueSender; use async_channel::Receiver; @@ -86,11 +87,10 @@ impl CaConnSet { insert_queue_max: usize, insert_item_queue_sender: CommonInsertItemQueueSender, data_store: Arc, - with_channels: Vec, ) -> Result { // TODO should we save this as event? trace!("create new CaConn {:?}", addr); - let mut conn = CaConn::new( + let conn = CaConn::new( backend.clone(), addr, local_epics_hostname, @@ -100,9 +100,6 @@ impl CaConnSet { array_truncate, insert_queue_max, ); - for ch in with_channels { - conn.channel_add(ch); - } let conn = conn; let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); @@ -247,6 +244,7 @@ impl CaConnSet { backend: String, addr: SocketAddrV4, name: String, + cssid: ChannelStatusSeriesId, insert_item_queue: &CommonInsertItemQueue, data_store: &Arc, insert_queue_max: usize, @@ -270,18 +268,17 @@ impl CaConnSet { .sender() .ok_or_else(|| Error::with_msg_no_trace("can not derive sender"))?, data_store.clone(), - Vec::new(), )?; g.insert(addr, ca_conn_ress); } match g.get(&addr) { Some(ca_conn) => { if true { - let op = super::conn::ChannelSetOp::Add; + let op = super::conn::ChannelSetOp::Add(cssid); ca_conn.channel_set_ops.insert(name, op); Ok(()) } else { - let cmd = ConnCommand::channel_add(name); + let cmd = ConnCommand::channel_add(name, cssid); let _cmdid = CmdId(addr, cmd.id()); ca_conn .sender diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 3e4a21a..49bc6b7 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -35,6 +35,19 @@ impl SeriesId { } } +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)] +pub struct ChannelStatusSeriesId(u64); + +impl ChannelStatusSeriesId { + pub fn new(id: u64) -> Self { + Self(id) + } + + pub fn id(&self) -> u64 { + self.0 + } +} + // TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. pub async fn get_series_id( pg_client: &PgClient, diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index fb6c328..86d1707 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -183,7 +183,7 @@ pub struct ConnectionStatusItem { #[derive(Debug, Clone)] pub enum ChannelStatusClosedReason { - IngestExit, + ShutdownCommand, ChannelRemove, ProtocolError, FrequencyQuota, @@ -196,6 +196,7 @@ pub enum ChannelStatusClosedReason { #[derive(Debug)] pub enum ChannelStatus { + AssignedToAddress, Opened, Closed(ChannelStatusClosedReason), } @@ -205,9 +206,10 @@ impl ChannelStatus { use ChannelStatus::*; use ChannelStatusClosedReason::*; match self { + AssignedToAddress => 24, Opened => 1, Closed(x) => match x { - IngestExit => 2, + ShutdownCommand => 2, ChannelRemove => 3, ProtocolError => 4, FrequencyQuota => 5, @@ -225,7 +227,7 @@ impl ChannelStatus { use ChannelStatusClosedReason::*; let ret = match kind { 1 => Opened, - 2 => Closed(IngestExit), + 2 => Closed(ShutdownCommand), 3 => Closed(ChannelRemove), 4 => Closed(ProtocolError), 5 => Closed(FrequencyQuota), @@ -234,6 +236,7 @@ impl ChannelStatus { 8 => Closed(IocTimeout), 9 => Closed(NoProtocol), 10 => Closed(ProtocolDone), + 24 => AssignedToAddress, _ => { return Err(err::Error::with_msg_no_trace(format!( "unknown ChannelStatus kind {kind}"