Pass channel status series id

This commit is contained in:
Dominik Werder
2023-02-14 16:31:48 +01:00
parent 962dfe570e
commit 7a10a740f6
6 changed files with 447 additions and 237 deletions
+349 -177
View File
@@ -5,6 +5,7 @@ use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
use netfetch::batchquery::series_by_channel::ChannelInfoQuery;
use netfetch::ca::conn::CaConnEvent;
use netfetch::ca::conn::ConnCommand;
use netfetch::ca::connset::CaConnSet;
@@ -20,6 +21,11 @@ use netfetch::errconv::ErrConv;
use netfetch::insertworker::Ttls;
use netfetch::metrics::ExtraInsertsConf;
use netfetch::metrics::StatsSet;
use netfetch::series::ChannelStatusSeriesId;
use netfetch::series::Existence;
use netfetch::series::SeriesId;
use netfetch::store::ChannelStatus;
use netfetch::store::ChannelStatusItem;
use netfetch::store::CommonInsertItemQueue;
use netfetch::store::ConnectionStatus;
use netfetch::store::ConnectionStatusItem;
@@ -52,6 +58,7 @@ const FINDER_IN_FLIGHT_MAX: usize = 800;
const FINDER_BATCH_SIZE: usize = 8;
const CHECK_CHANS_PER_TICK: usize = 10000;
const CA_CONN_INSERT_QUEUE_MAX: usize = 256;
const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1;
const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000);
const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000);
@@ -131,7 +138,7 @@ pub enum WithAddressState {
}
#[derive(Clone, Debug, Serialize)]
pub enum ActiveChannelState {
pub enum WithStatusSeriesIdStateInner {
UnknownAddress {
since: SystemTime,
},
@@ -149,6 +156,26 @@ pub enum ActiveChannelState {
},
}
#[derive(Clone, Debug, Serialize)]
pub struct WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner,
}
#[derive(Clone, Debug)]
pub enum ActiveChannelState {
Init {
since: SystemTime,
},
WaitForStatusSeriesId {
since: SystemTime,
rx: Receiver<Result<Existence<SeriesId>, Error>>,
},
WithStatusSeriesId {
status_series_id: ChannelStatusSeriesId,
state: WithStatusSeriesIdState,
},
}
#[derive(Debug)]
pub enum ChannelStateValue {
Active(ActiveChannelState),
@@ -259,6 +286,7 @@ pub struct Daemon {
stats: Arc<DaemonStats>,
shutting_down: bool,
insert_rx_weak: WeakReceiver<QueryItem>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
}
impl Daemon {
@@ -428,7 +456,7 @@ impl Daemon {
}
};
// TODO await on shutdown
let jh = tokio::spawn(fut);
let _jh = tokio::spawn(fut);
//let mut jhs = Vec::new();
//jhs.push(jh);
//futures_util::future::join_all(jhs).await;
@@ -460,6 +488,7 @@ impl Daemon {
stats: Arc::new(DaemonStats::new()),
shutting_down: false,
insert_rx_weak,
channel_info_query_tx,
};
Ok(ret)
}
@@ -758,26 +787,34 @@ impl Daemon {
for (_ch, st) in &self.channel_states {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::UnknownAddress { since: _ } => {
ActiveChannelState::Init { .. } => {
without_address_count += 1;
}
ActiveChannelState::SearchPending { since: _, did_send: _ } => {
currently_search_pending += 1;
ActiveChannelState::WaitForStatusSeriesId { .. } => {
without_address_count += 1;
}
ActiveChannelState::WithAddress { addr: _, state } => match state {
WithAddressState::Unassigned { assign_at: _ } => {
with_address_count += 1;
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
without_address_count += 1;
}
WithAddressState::Assigned(_) => {
with_address_count += 1;
WithStatusSeriesIdStateInner::SearchPending { .. } => {
currently_search_pending += 1;
without_address_count += 1;
}
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
with_address_count += 1;
}
WithAddressState::Assigned(_) => {
with_address_count += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
without_address_count += 1;
}
},
ActiveChannelState::NoAddress { since: _ } => {
without_address_count += 1;
}
},
ChannelStateValue::ToRemove { addr: _ } => {
ChannelStateValue::ToRemove { .. } => {
with_address_count += 1;
}
}
@@ -797,85 +834,151 @@ impl Daemon {
self.channel_states.range_mut(..)
};
let tsnow = SystemTime::now();
let mut attempt_series_search = true;
for (i, (ch, st)) in it.enumerate() {
use ActiveChannelState::*;
use ChannelStateValue::*;
match &mut st.value {
Active(st2) => match st2 {
UnknownAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > UNKNOWN_ADDRESS_STAY {
//info!("UnknownAddress {} {:?}", i, ch);
if currently_search_pending < CURRENT_SEARCH_PENDING_MAX {
currently_search_pending += 1;
st.value = Active(SearchPending {
since: tsnow,
did_send: false,
});
SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { since: _ } => {
let (tx, rx) = async_channel::bounded(1);
let q = ChannelInfoQuery {
backend: self.ingest_commons.backend.clone(),
channel: ch.id().into(),
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: Vec::new(),
tx,
};
if attempt_series_search {
match self.channel_info_query_tx.try_send(q) {
Ok(()) => {
*st2 = ActiveChannelState::WaitForStatusSeriesId { since: tsnow, rx };
}
Err(e) => match e {
_ => {
attempt_series_search = false;
}
},
}
}
}
SearchPending { since, did_send: _ } => {
//info!("SearchPending {} {:?}", i, ch);
ActiveChannelState::WaitForStatusSeriesId { since, rx } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
info!("Search timeout for {ch:?}");
st.value = Active(ActiveChannelState::NoAddress { since: tsnow });
currently_search_pending -= 1;
if dt > Duration::from_millis(5000) {
warn!("timeout can not get status series id");
*st2 = ActiveChannelState::Init { since: tsnow };
} else {
match rx.try_recv() {
Ok(x) => match x {
Ok(x) => {
//info!("received status series id: {x:?}");
*st2 = ActiveChannelState::WithStatusSeriesId {
status_series_id: ChannelStatusSeriesId::new(x.into_inner().id()),
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow },
},
};
}
Err(e) => {
error!("could not get a status series id");
}
},
Err(e) => {}
}
}
}
WithAddress { addr, state } => {
//info!("WithAddress {} {:?}", i, ch);
use WithAddressState::*;
match state {
Unassigned { assign_at } => {
if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow {
let backend = self.opts.backend().into();
let channel_name = ch.id().into();
// This operation is meant to complete very quickly
self.ingest_commons
.ca_conn_set
.add_channel_to_addr(
backend,
*addr,
channel_name,
&self.common_insert_item_queue,
&self.datastore,
CA_CONN_INSERT_QUEUE_MAX,
self.opts.array_truncate,
self.opts.local_epics_hostname.clone(),
)
.slow_warn(2000)
.instrument(info_span!("add_channel_to_addr"))
.await?;
let cs = ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unconnected,
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} => match &mut state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > UNKNOWN_ADDRESS_STAY {
//info!("UnknownAddress {} {:?}", i, ch);
if currently_search_pending < CURRENT_SEARCH_PENDING_MAX {
currently_search_pending += 1;
state.inner = WithStatusSeriesIdStateInner::SearchPending {
since: tsnow,
did_send: false,
};
*state = WithAddressState::Assigned(cs);
self.connection_states.entry(*addr).or_insert_with(|| {
let t = CaConnState {
last_feedback: Instant::now(),
value: CaConnStateValue::Fresh,
};
t
});
SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
}
}
Assigned(_) => {
// TODO check if channel is healthy and alive
}
WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => {
//info!("SearchPending {} {:?}", i, ch);
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
info!("Search timeout for {ch:?}");
state.inner = WithStatusSeriesIdStateInner::NoAddress { since: tsnow };
currently_search_pending -= 1;
}
}
}
NoAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > NO_ADDRESS_STAY {
st.value = Active(ActiveChannelState::UnknownAddress { since: tsnow });
WithStatusSeriesIdStateInner::WithAddress { addr, state } => {
//info!("WithAddress {} {:?}", i, ch);
use WithAddressState::*;
match state {
Unassigned { assign_at } => {
if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow {
let backend = self.opts.backend().into();
let channel_name = ch.id().into();
// This operation is meant to complete very quickly
self.ingest_commons
.ca_conn_set
.add_channel_to_addr(
backend,
*addr,
channel_name,
status_series_id.clone(),
&self.common_insert_item_queue,
&self.datastore,
CA_CONN_INSERT_QUEUE_MAX,
self.opts.array_truncate,
self.opts.local_epics_hostname.clone(),
)
.slow_warn(2000)
.instrument(info_span!("add_channel_to_addr"))
.await?;
let cs = ConnectionState {
updated: tsnow,
value: ConnectionStateValue::Unconnected,
};
*state = WithAddressState::Assigned(cs);
self.connection_states.entry(*addr).or_insert_with(|| {
let t = CaConnState {
last_feedback: Instant::now(),
value: CaConnStateValue::Fresh,
};
t
});
// TODO move await out of here
if let Some(tx) = self.ingest_commons.insert_item_queue.sender() {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: tsnow,
series: SeriesId::new(status_series_id.id()),
status: ChannelStatus::AssignedToAddress,
});
match tx.send(item).await {
Ok(_) => {}
Err(_) => {
// TODO feed into throttled log, or count as unlogged
}
}
}
}
}
Assigned(_) => {
// TODO check if channel is healthy and alive
}
}
}
}
WithStatusSeriesIdStateInner::NoAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > NO_ADDRESS_STAY {
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow };
}
}
},
},
ToRemove { .. } => {
ChannelStateValue::ToRemove { .. } => {
// TODO if assigned to some address,
}
}
@@ -885,21 +988,27 @@ impl Daemon {
}
}
for (ch, st) in &mut self.channel_states {
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since: _, did_send }) = &mut st.value {
if *did_send == false {
match self.search_tx.try_send(ch.id().into()) {
Ok(()) => {
*did_send = true;
SEARCH_REQ_SEND_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
}
Err(e) => match e {
async_channel::TrySendError::Full(_) => {}
async_channel::TrySendError::Closed(_) => {
error!("Finder channel closed");
// TODO recover from this.
panic!();
if let ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state,
}) = &mut st.value
{
if let WithStatusSeriesIdStateInner::SearchPending { since: _, did_send } = &mut state.inner {
if *did_send == false {
match self.search_tx.try_send(ch.id().into()) {
Ok(()) => {
*did_send = true;
SEARCH_REQ_SEND_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
}
},
Err(e) => match e {
async_channel::TrySendError::Full(_) => {}
async_channel::TrySendError::Closed(_) => {
error!("Finder channel closed");
// TODO recover from this.
panic!();
}
},
}
}
}
}
@@ -914,26 +1023,30 @@ impl Daemon {
for (_ch, st) in &self.channel_states {
match &st.value {
ChannelStateValue::Active(st) => match st {
ActiveChannelState::UnknownAddress { .. } => {
self.count_unknown_address += 1;
}
ActiveChannelState::SearchPending { did_send, .. } => {
self.count_search_pending += 1;
if *did_send {
self.count_search_sent += 1;
ActiveChannelState::Init { .. } => {}
ActiveChannelState::WaitForStatusSeriesId { .. } => {}
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
self.count_unknown_address += 1;
}
}
ActiveChannelState::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
self.count_unassigned += 1;
WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => {
self.count_search_pending += 1;
if *did_send {
self.count_search_sent += 1;
}
}
WithAddressState::Assigned(_) => {
self.count_assigned += 1;
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
self.count_unassigned += 1;
}
WithAddressState::Assigned(_) => {
self.count_assigned += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
self.count_no_address += 1;
}
},
ActiveChannelState::NoAddress { .. } => {
self.count_no_address += 1;
}
},
ChannelStateValue::ToRemove { .. } => {}
}
@@ -1037,7 +1150,7 @@ impl Daemon {
fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> {
if !self.channel_states.contains_key(&ch) {
let st = ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress {
value: ChannelStateValue::Active(ActiveChannelState::Init {
since: SystemTime::now(),
}),
};
@@ -1050,20 +1163,31 @@ impl Daemon {
if let Some(k) = self.channel_states.get_mut(&ch) {
match &k.value {
ChannelStateValue::Active(j) => match j {
ActiveChannelState::UnknownAddress { .. } => {
ActiveChannelState::Init { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::SearchPending { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WithAddress { addr, .. } => {
k.value = ChannelStateValue::ToRemove {
addr: Some(addr.clone()),
};
}
ActiveChannelState::NoAddress { .. } => {
ActiveChannelState::WaitForStatusSeriesId { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state,
} => match state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::WithAddress { addr, .. } => {
k.value = ChannelStateValue::ToRemove {
addr: Some(addr.clone()),
};
}
WithStatusSeriesIdStateInner::NoAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
},
},
ChannelStateValue::ToRemove { .. } => {}
}
@@ -1073,6 +1197,7 @@ impl Daemon {
async fn handle_search_done(&mut self, item: Result<VecDeque<FindIocRes>, Error>) -> Result<(), Error> {
//debug!("handle SearchDone: {res:?}");
let allow_create_new_connections = self.allow_create_new_connections();
let tsnow = SystemTime::now();
match item {
Ok(ress) => {
@@ -1080,56 +1205,96 @@ impl Daemon {
for res in ress {
if let Some(addr) = &res.addr {
self.stats.ioc_search_some_inc();
if self.allow_create_new_connections() {
let ch = Channel::new(res.channel);
if let Some(st) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ActiveChannelState::SearchPending {
since,
did_send: _,
}) = &st.value
{
let dt = tsnow.duration_since(*since).unwrap();
if dt > SEARCH_PENDING_TIMEOUT_WARN {
warn!(
" FOUND {:5.0} {:5.0} {addr}",
1e3 * dt.as_secs_f32(),
1e3 * res.dt.as_secs_f32()
);
}
let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress {
addr: addr.clone(),
state: WithAddressState::Unassigned { assign_at: tsnow },
});
st.value = stnew;
} else {
warn!(
"address found, but state for {ch:?} is not SearchPending: {:?}",
st.value
);
}
} else {
warn!("can not find channel state for {ch:?}");
let ch = Channel::new(res.channel);
if let Some(st) = self.channel_states.get_mut(&ch) {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {}
ActiveChannelState::WaitForStatusSeriesId { .. } => {}
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} => match state.inner {
WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => {
if allow_create_new_connections {
let dt = tsnow.duration_since(since).unwrap();
if dt > SEARCH_PENDING_TIMEOUT_WARN {
warn!(
" FOUND {:5.0} {:5.0} {addr}",
1e3 * dt.as_secs_f32(),
1e3 * res.dt.as_secs_f32()
);
}
let stnew =
ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
status_series_id: status_series_id.clone(),
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::WithAddress {
addr: addr.clone(),
state: WithAddressState::Unassigned {
assign_at: tsnow,
},
},
},
});
st.value = stnew;
} else {
// Emit something here?
}
}
_ => {
warn!(
"address found, but state for {ch:?} is not SearchPending: {:?}",
st.value
);
}
},
},
ChannelStateValue::ToRemove { addr: _ } => {}
}
} else {
// Emit something here?
warn!("can not find channel state for {ch:?}");
}
} else {
//debug!("no addr from search in {res:?}");
let ch = Channel::new(res.channel);
if let Some(st) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since, did_send: _ }) =
&st.value
{
let dt = tsnow.duration_since(*since).unwrap();
if dt > SEARCH_PENDING_TIMEOUT_WARN {
warn!(
"NOT FOUND {:5.0} {:5.0}",
1e3 * dt.as_secs_f32(),
1e3 * res.dt.as_secs_f32()
);
}
st.value = ChannelStateValue::Active(ActiveChannelState::NoAddress { since: tsnow });
} else {
let mut unexpected_state = true;
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {}
ActiveChannelState::WaitForStatusSeriesId { .. } => {}
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state: st3,
} => match &st3.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {}
WithStatusSeriesIdStateInner::SearchPending { since, .. } => {
unexpected_state = false;
let dt = tsnow.duration_since(*since).unwrap();
if dt > SEARCH_PENDING_TIMEOUT_WARN {
warn!(
"NOT FOUND {:5.0} {:5.0}",
1e3 * dt.as_secs_f32(),
1e3 * res.dt.as_secs_f32()
);
}
st.value =
ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
status_series_id: status_series_id.clone(),
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::NoAddress { since: tsnow },
},
});
}
WithStatusSeriesIdStateInner::WithAddress { .. } => {}
WithStatusSeriesIdStateInner::NoAddress { .. } => {}
},
},
ChannelStateValue::ToRemove { .. } => {}
}
if unexpected_state {
warn!("no address, but state for {ch:?} is not SearchPending: {:?}", st.value);
}
} else {
@@ -1152,22 +1317,29 @@ impl Daemon {
for (_k, v) in self.channel_states.iter_mut() {
match &v.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::UnknownAddress { .. } => {}
ActiveChannelState::SearchPending { since: _, did_send: _ } => {}
ActiveChannelState::WithAddress { addr, state: _ } => {
if addr == &conn_addr {
// TODO reset channel, emit log event for the connection addr only
//info!("ca conn down, reset {k:?}");
*v = ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress {
since: SystemTime::now(),
}),
};
ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} => match &st3.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {}
WithStatusSeriesIdStateInner::SearchPending { .. } => {}
WithStatusSeriesIdStateInner::WithAddress { addr, .. } => {
if addr == &conn_addr {
// TODO reset channel, emit log event for the connection addr only
//info!("ca conn down, reset {k:?}");
*v = ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::Init {
since: SystemTime::now(),
}),
};
}
}
}
ActiveChannelState::NoAddress { .. } => {}
WithStatusSeriesIdStateInner::NoAddress { .. } => {}
},
ActiveChannelState::Init { .. } => {}
ActiveChannelState::WaitForStatusSeriesId { .. } => {}
},
ChannelStateValue::ToRemove { addr: _ } => {}
ChannelStateValue::ToRemove { .. } => {}
}
}
let item = QueryItem::ConnectionStatus(ConnectionStatusItem {
@@ -1176,7 +1348,7 @@ impl Daemon {
status: ConnectionStatus::ConnectionHandlerDone,
});
if let Some(tx) = self.ingest_commons.insert_item_queue.sender() {
if let Err(e) = tokio::time::timeout(Duration::from_millis(1000), tx.send(item)).await {
if let Err(_) = tokio::time::timeout(Duration::from_millis(1000), tx.send(item)).await {
error!("timeout on insert queue send");
} else {
}
+22 -23
View File
@@ -29,7 +29,7 @@ impl ChannelInfoQuery {
Self {
backend: String::new(),
channel: String::new(),
scalar_type: 4242,
scalar_type: -1,
shape_dims: Vec::new(),
tx: self.tx.clone(),
}
@@ -37,9 +37,8 @@ impl ChannelInfoQuery {
}
struct ChannelInfoResult {
series: Vec<Existence<SeriesId>>,
tx: Vec<Sender<Result<Existence<SeriesId>, Error>>>,
missing: Vec<ChannelInfoQuery>,
series: Existence<SeriesId>,
tx: Sender<Result<Existence<SeriesId>, Error>>,
}
struct PgRes {
@@ -63,7 +62,10 @@ async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender<P
Ok((pgc_tx, pgc_rx))
}
async fn select(batch: Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> {
async fn select(
batch: Vec<ChannelInfoQuery>,
pgres: PgRes,
) -> Result<(Vec<ChannelInfoResult>, Vec<ChannelInfoQuery>, PgRes), Error> {
let mut backend = Vec::new();
let mut channel = Vec::new();
let mut scalar_type = Vec::new();
@@ -99,8 +101,7 @@ async fn select(batch: Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(ChannelIn
Error::from(e.to_string())
}) {
Ok(rows) => {
let mut series_ids = Vec::new();
let mut txs = Vec::new();
let mut result = Vec::new();
let mut missing = Vec::new();
let mut it1 = rows.into_iter();
let mut e1 = it1.next();
@@ -110,8 +111,11 @@ async fn select(batch: Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(ChannelIn
if rid as u32 == qrid {
let series: i64 = row.get(0);
let series = SeriesId::new(series as _);
series_ids.push(Existence::Existing(series));
txs.push(tx);
let res = ChannelInfoResult {
series: Existence::Existing(series),
tx,
};
result.push(res);
}
e1 = it1.next();
} else {
@@ -126,12 +130,7 @@ async fn select(batch: Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(ChannelIn
missing.push(k);
}
}
let result = ChannelInfoResult {
series: series_ids,
tx: txs,
missing,
};
Ok((result, pgres))
Ok((result, missing, pgres))
}
Err(e) => {
error!("error in pg query {e}");
@@ -219,12 +218,12 @@ async fn insert_missing(batch: &Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(
Ok(((), pgres))
}
async fn fetch_data(batch: Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> {
let (res1, pgres) = select(batch, pgres).await?;
if res1.missing.len() > 0 {
let ((), pgres) = insert_missing(&res1.missing, pgres).await?;
let (res2, pgres) = select(res1.missing, pgres).await?;
if res2.missing.len() > 0 {
async fn fetch_data(batch: Vec<ChannelInfoQuery>, pgres: PgRes) -> Result<(Vec<ChannelInfoResult>, PgRes), Error> {
let (res1, missing, pgres) = select(batch, pgres).await?;
if missing.len() > 0 {
let ((), pgres) = insert_missing(&missing, pgres).await?;
let (res2, missing2, pgres) = select(missing, pgres).await?;
if missing2.len() > 0 {
Err(Error::with_msg_no_trace("some series not found even after write"))
} else {
Ok((res2, pgres))
@@ -264,8 +263,8 @@ async fn run_queries(
while let Some(item) = stream.next().await {
match item {
Ok(res) => {
for (sid, tx) in res.series.into_iter().zip(res.tx) {
match tx.send(Ok(sid)).await {
for r in res {
match r.tx.send(Ok(r.series)).await {
Ok(_) => {}
Err(e) => {
// TODO count cases, but no log. Client may no longer be interested in this result.
+52 -26
View File
@@ -9,6 +9,7 @@ use crate::batchquery::series_by_channel::ChannelInfoQuery;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::CreateChan;
use crate::ca::proto::EventAdd;
use crate::series::ChannelStatusSeriesId;
use crate::series::Existence;
use crate::series::SeriesId;
use crate::store::ChannelInfoItem;
@@ -132,6 +133,7 @@ enum MonitoringState {
#[derive(Clone, Debug)]
struct CreatedState {
cssid: ChannelStatusSeriesId,
#[allow(unused)]
cid: Cid,
#[allow(unused)]
@@ -156,8 +158,12 @@ struct CreatedState {
#[allow(unused)]
#[derive(Clone, Debug)]
enum ChannelState {
Init,
Creating { cid: Cid, ts_beg: Instant },
Init(ChannelStatusSeriesId),
Creating {
cssid: ChannelStatusSeriesId,
cid: Cid,
ts_beg: Instant,
},
Created(CreatedState),
Error(ChannelError),
Ended,
@@ -166,7 +172,7 @@ enum ChannelState {
impl ChannelState {
fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo {
let channel_connected_info = match self {
ChannelState::Init => ChannelConnectedInfo::Disconnected,
ChannelState::Init(_) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
ChannelState::Created(_) => ChannelConnectedInfo::Connected,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
@@ -222,11 +228,6 @@ impl ChannelState {
}
}
#[allow(unused)]
struct ChannelsStates {
channels: BTreeMap<Cid, ChannelState>,
}
enum CaConnState {
Unconnected,
Connecting(
@@ -284,7 +285,7 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 {
#[derive(Debug)]
pub enum ConnCommandKind {
ChannelAdd(String),
ChannelAdd(String, ChannelStatusSeriesId),
ChannelRemove(String),
CheckHealth,
Shutdown,
@@ -297,10 +298,10 @@ pub struct ConnCommand {
}
impl ConnCommand {
pub fn channel_add(name: String) -> Self {
pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::ChannelAdd(name),
kind: ConnCommandKind::ChannelAdd(name, cssid),
}
}
@@ -362,7 +363,7 @@ pub struct CaConnEvent {
#[derive(Debug)]
pub enum ChannelSetOp {
Add,
Add(ChannelStatusSeriesId),
Remove,
}
@@ -570,8 +571,8 @@ impl CaConn {
// TODO return the result
}
fn cmd_channel_add(&mut self, name: String) {
self.channel_add(name);
fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) {
self.channel_add(name, cssid);
// TODO return the result
//self.stats.caconn_command_can_not_reply_inc();
}
@@ -583,7 +584,7 @@ impl CaConn {
}
fn cmd_shutdown(&mut self) {
self.trigger_shutdown(ChannelStatusClosedReason::IngestExit);
self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand);
}
fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) {
@@ -603,8 +604,8 @@ impl CaConn {
self.stats.caconn_loop3_count_inc();
match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => match a.kind {
ConnCommandKind::ChannelAdd(name) => {
self.cmd_channel_add(name);
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ready(Some(Ok(())))
}
ConnCommandKind::ChannelRemove(name) => {
@@ -634,6 +635,7 @@ impl CaConn {
fn channel_add_expl(
channel: String,
cssid: ChannelStatusSeriesId,
channels: &mut BTreeMap<Cid, ChannelState>,
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut BTreeMap<Cid, String>,
@@ -647,15 +649,16 @@ impl CaConn {
if channels.contains_key(&cid) {
error!("logic error");
} else {
channels.insert(cid, ChannelState::Init);
channels.insert(cid, ChannelState::Init(cssid));
// TODO do not count, use separate queue for those channels.
*init_state_count += 1;
}
}
pub fn channel_add(&mut self, channel: String) {
pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) {
Self::channel_add_expl(
channel,
cssid,
&mut self.channels,
&mut self.cid_by_name,
&mut self.name_by_cid,
@@ -726,7 +729,7 @@ impl CaConn {
let mut warn_max = 0;
for (_cid, chst) in &mut self.channels {
match chst {
ChannelState::Init => {
ChannelState::Init(cssid) => {
*chst = ChannelState::Ended;
}
ChannelState::Creating { .. } => {
@@ -828,7 +831,7 @@ impl CaConn {
let mut not_alive_count = 0;
for (_, st) in &self.channels {
match st {
ChannelState::Creating { cid, ts_beg } => {
ChannelState::Creating { cid, ts_beg, cssid: _ } => {
if false && tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) {
let name = self.name_by_cid.get(cid);
// TODO channel create timed out how to let daemon know?
@@ -861,10 +864,14 @@ impl CaConn {
let timenow = SystemTime::now();
for (_, st) in &mut self.channels {
match st {
ChannelState::Init => {
ChannelState::Init(cssid) => {
// TODO need last-save-ts for this state.
}
ChannelState::Creating { cid: _, ts_beg: _ } => {
ChannelState::Creating {
cid: _,
ts_beg: _,
cssid: _,
} => {
// TODO need last-save-ts for this state.
}
ChannelState::Created(st) => {
@@ -930,7 +937,15 @@ impl CaConn {
proto.push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(),
_ => {
let e = Error::with_msg_no_trace("channel_to_evented bad state");
return Err(e);
}
};
*ch_s = ChannelState::Created(CreatedState {
cssid,
cid,
sid,
// TODO handle error better! Transition channel to Error state?
@@ -1289,12 +1304,13 @@ impl CaConn {
let keys: Vec<Cid> = self.channels.keys().map(|x| *x).collect();
for cid in keys {
match self.channels.get_mut(&cid).unwrap() {
ChannelState::Init => {
ChannelState::Init(cssid) => {
let cssid = cssid.clone();
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::with_msg_no_trace("name for cid not known"));
let name = match name {
Ok(k) => k,
Ok(k) => k.to_string(),
Err(e) => return Err(e),
};
let msg = CaMsg {
@@ -1307,6 +1323,7 @@ impl CaConn {
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Creating {
cssid,
cid,
ts_beg: Instant::now(),
};
@@ -1371,7 +1388,15 @@ impl CaConn {
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating { cssid, cid, ts_beg } => cssid.clone(),
_ => {
let e = Error::with_msg_no_trace("channel_to_evented bad state");
return Ready(Some(Err(e)));
}
};
*ch_s = ChannelState::Created(CreatedState {
cssid,
cid,
sid,
scalar_type: scalar_type.clone(),
@@ -1631,8 +1656,9 @@ impl CaConn {
let map = std::mem::replace(&mut *g, BTreeMap::new());
for (ch, op) in map {
match op {
ChannelSetOp::Add => Self::channel_add_expl(
ChannelSetOp::Add(cssid) => Self::channel_add_expl(
ch,
cssid,
res.channels,
res.cid_by_name,
res.name_by_cid,
+5 -8
View File
@@ -9,6 +9,7 @@ use crate::ca::conn::CaConnEventValue;
use crate::errconv::ErrConv;
use crate::rt::JoinHandle;
use crate::rt::TokMx;
use crate::series::ChannelStatusSeriesId;
use crate::store::CommonInsertItemQueue;
use crate::store::CommonInsertItemQueueSender;
use async_channel::Receiver;
@@ -86,11 +87,10 @@ impl CaConnSet {
insert_queue_max: usize,
insert_item_queue_sender: CommonInsertItemQueueSender,
data_store: Arc<DataStore>,
with_channels: Vec<String>,
) -> Result<CaConnRess, Error> {
// TODO should we save this as event?
trace!("create new CaConn {:?}", addr);
let mut conn = CaConn::new(
let conn = CaConn::new(
backend.clone(),
addr,
local_epics_hostname,
@@ -100,9 +100,6 @@ impl CaConnSet {
array_truncate,
insert_queue_max,
);
for ch in with_channels {
conn.channel_add(ch);
}
let conn = conn;
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
@@ -247,6 +244,7 @@ impl CaConnSet {
backend: String,
addr: SocketAddrV4,
name: String,
cssid: ChannelStatusSeriesId,
insert_item_queue: &CommonInsertItemQueue,
data_store: &Arc<DataStore>,
insert_queue_max: usize,
@@ -270,18 +268,17 @@ impl CaConnSet {
.sender()
.ok_or_else(|| Error::with_msg_no_trace("can not derive sender"))?,
data_store.clone(),
Vec::new(),
)?;
g.insert(addr, ca_conn_ress);
}
match g.get(&addr) {
Some(ca_conn) => {
if true {
let op = super::conn::ChannelSetOp::Add;
let op = super::conn::ChannelSetOp::Add(cssid);
ca_conn.channel_set_ops.insert(name, op);
Ok(())
} else {
let cmd = ConnCommand::channel_add(name);
let cmd = ConnCommand::channel_add(name, cssid);
let _cmdid = CmdId(addr, cmd.id());
ca_conn
.sender
+13
View File
@@ -35,6 +35,19 @@ impl SeriesId {
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)]
pub struct ChannelStatusSeriesId(u64);
impl ChannelStatusSeriesId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn id(&self) -> u64 {
self.0
}
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id(
pg_client: &PgClient,
+6 -3
View File
@@ -183,7 +183,7 @@ pub struct ConnectionStatusItem {
#[derive(Debug, Clone)]
pub enum ChannelStatusClosedReason {
IngestExit,
ShutdownCommand,
ChannelRemove,
ProtocolError,
FrequencyQuota,
@@ -196,6 +196,7 @@ pub enum ChannelStatusClosedReason {
#[derive(Debug)]
pub enum ChannelStatus {
AssignedToAddress,
Opened,
Closed(ChannelStatusClosedReason),
}
@@ -205,9 +206,10 @@ impl ChannelStatus {
use ChannelStatus::*;
use ChannelStatusClosedReason::*;
match self {
AssignedToAddress => 24,
Opened => 1,
Closed(x) => match x {
IngestExit => 2,
ShutdownCommand => 2,
ChannelRemove => 3,
ProtocolError => 4,
FrequencyQuota => 5,
@@ -225,7 +227,7 @@ impl ChannelStatus {
use ChannelStatusClosedReason::*;
let ret = match kind {
1 => Opened,
2 => Closed(IngestExit),
2 => Closed(ShutdownCommand),
3 => Closed(ChannelRemove),
4 => Closed(ProtocolError),
5 => Closed(FrequencyQuota),
@@ -234,6 +236,7 @@ impl ChannelStatus {
8 => Closed(IocTimeout),
9 => Closed(NoProtocol),
10 => Closed(ProtocolDone),
24 => AssignedToAddress,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
"unknown ChannelStatus kind {kind}"