Trigger build

This commit is contained in:
Dominik Werder
2023-09-07 12:18:56 +02:00
parent 9489d82bbb
commit 649011b2e2
9 changed files with 185 additions and 198 deletions

View File

@@ -159,6 +159,8 @@ struct CreatedState {
cid: Cid,
#[allow(unused)]
sid: u32,
data_type: u16,
data_count: u16,
scalar_type: ScalarType,
shape: Shape,
#[allow(unused)]
@@ -468,11 +470,8 @@ pub struct CaConn {
ioc_ping_start: Option<Instant>,
cmd_res_queue: VecDeque<ConnCommandResult>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
series_lookup_schedule: BTreeMap<Cid, ChannelInfoQuery>,
series_lookup_futs: FuturesUnordered<
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sending: SenderPolling<ChannelInfoQuery>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
ts_earliest_warn_poll_slow: Instant,
}
@@ -521,9 +520,8 @@ impl CaConn {
ioc_ping_start: None,
cmd_res_queue: VecDeque::new(),
ca_conn_event_out_queue: VecDeque::new(),
channel_info_query_tx,
series_lookup_schedule: BTreeMap::new(),
series_lookup_futs: FuturesUnordered::new(),
channel_info_query_queue: VecDeque::new(),
channel_info_query_sending: SenderPolling::new(channel_info_query_tx),
time_binners: BTreeMap::new(),
ts_earliest_warn_poll_slow: Instant::now(),
}
@@ -639,6 +637,49 @@ impl CaConn {
// TODO return the result
}
fn handle_series_lookup_result(
&mut self,
res: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>,
) -> Result<(), Error> {
match res {
Ok(res) => {
let series = res.series.into_inner();
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone(),
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
if let Some(cid) = self.cid_by_name.get(&res.channel) {
if let Some(chst) = self.channels.get(cid) {
if let ChannelState::FetchingSeriesId(st2) = chst {
let cid = st2.cid.clone();
let sid = st2.sid;
let data_type = st2.data_type;
let data_count = st2.data_count;
match self.channel_to_evented(cid, sid, data_type, data_count, series) {
Ok(_) => {}
Err(e) => {
error!("channel_to_evented {e}");
}
}
} else {
warn!("TODO channel in bad state, reset");
}
} else {
warn!("TODO channel in bad state, reset");
}
} else {
warn!("TODO channel in bad state, reset");
}
}
Err(e) => {
error!("handle_series_lookup_result got error {e}");
}
}
Ok(())
}
fn handle_conn_command(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
// TODO if this loops for too long time, yield and make sure we get wake up again.
use Poll::*;
@@ -664,7 +705,10 @@ impl CaConn {
self.cmd_shutdown();
Ready(Some(Ok(())))
}
ConnCommandKind::SeriesLookupResult(_) => todo!("TODO handle SeriesLookupResult"),
ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) {
Ok(()) => Ready(Some(Ok(()))),
Err(e) => Ready(Some(Err(e))),
},
}
}
Ready(None) => {
@@ -912,15 +956,10 @@ impl CaConn {
sid: u32,
data_type: u16,
data_count: u16,
series: Existence<SeriesId>,
cx: &mut Context,
series: SeriesId,
) -> Result<(), Error> {
let tsnow = Instant::now();
self.stats.get_series_id_ok_inc();
let series = match series {
Existence::Created(k) => k,
Existence::Existing(k) => k,
};
if series.id() == 0 {
warn!("Weird series id: {series:?}");
}
@@ -962,6 +1001,8 @@ impl CaConn {
cssid,
cid,
sid,
data_type,
data_count,
scalar_type,
shape,
ts_created: tsnow,
@@ -980,72 +1021,10 @@ impl CaConn {
*ch_s = ChannelState::Created(series, created_state);
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
cx.waker().wake_by_ref();
error!("TODO channel_to_evented make sure we get polled again?");
Ok(())
}
fn emit_series_lookup(&mut self, cx: &mut Context) {
let _ = cx;
loop {
break if let Some(mut entry) = self.series_lookup_schedule.first_entry() {
todo!("emit_series_lookup");
#[cfg(DISABLED)]
{
let dummy = entry.get().dummy();
let query = std::mem::replace(entry.get_mut(), dummy);
match self.channel_info_query_tx.try_send(query) {
Ok(()) => {
entry.remove();
continue;
}
Err(e) => match e {
async_channel::TrySendError::Full(_) => {
warn!("series lookup channel full");
*entry.get_mut() = e.into_inner();
}
async_channel::TrySendError::Closed(_) => {
warn!("series lookup channel closed");
// *entry.get_mut() = e.into_inner();
entry.remove();
}
},
}
}
} else {
()
};
}
}
fn poll_channel_info_results(&mut self, cx: &mut Context) {
use Poll::*;
loop {
break match self.series_lookup_futs.poll_next_unpin(cx) {
Ready(Some(Ok((cid, sid, data_type, data_count, series)))) => {
{
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone().into_inner().into(),
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
}
match self.channel_to_evented(cid, sid, data_type, data_count, series, cx) {
Ok(_) => {}
Err(e) => {
error!("poll_channel_info_results {e}");
}
}
}
Ready(Some(Err(e))) => {
error!("poll_channel_info_results {e}");
}
Ready(None) => {}
Pending => {}
};
}
}
fn event_add_insert(
st: &mut CreatedState,
series: SeriesId,
@@ -1451,6 +1430,8 @@ impl CaConn {
cssid,
cid,
sid,
data_type: k.data_type,
data_count: k.data_count,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: tsnow,
@@ -1468,37 +1449,17 @@ impl CaConn {
};
*ch_s = ChannelState::FetchingSeriesId(created_state);
// TODO handle error in different way. Should most likely not abort.
if !self.series_lookup_schedule.contains_key(&cid) {
let tx = SendSeriesLookup {
tx: self.conn_command_tx.clone(),
};
let query = ChannelInfoQuery {
backend: self.backend.clone(),
channel: name.clone(),
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
self.series_lookup_schedule.insert(cid, query);
todo!("TODO discover the series lookup from main command queue");
// let fut = async move {
// match rx.recv().await {
// Ok(item) => match item {
// Ok(item) => Ok((cid, sid, k.data_type, k.data_count, item)),
// Err(e) => Err(Error::with_msg_no_trace(e.to_string())),
// },
// Err(e) => {
// // TODO count only
// error!("can not receive series lookup result for {name} {e}");
// Err(Error::with_msg_no_trace("can not receive lookup result"))
// }
// }
// };
// self.series_lookup_futs.push(Box::pin(fut));
} else {
// TODO count only
warn!("series lookup for {name} already in progress");
}
let tx = SendSeriesLookup {
tx: self.conn_command_tx.clone(),
};
let query = ChannelInfoQuery {
backend: self.backend.clone(),
channel: name.clone(),
scalar_type: scalar_type.to_scylla_i32(),
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
self.channel_info_query_queue.push_back(query);
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => {
@@ -1739,8 +1700,6 @@ impl CaConn {
}
fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
self.emit_series_lookup(cx);
self.poll_channel_info_results(cx);
let this = self.get_mut();
for (_, tb) in this.time_binners.iter_mut() {
let iiq = &mut this.insert_item_queue;
@@ -1793,6 +1752,22 @@ impl Stream for CaConn {
};
Ready(Some(Ok(ev)))
} else {
let _ = loop {
let sd = &mut self.channel_info_query_sending;
break if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => Ready(Some(e)),
Pending => Pending,
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
let sd = &mut self.channel_info_query_sending;
sd.send2(item);
continue;
} else {
Ready(None)
};
};
let ret = loop {
self.stats.caconn_loop1_count_inc();
loop {

View File

@@ -244,38 +244,7 @@ impl CaConnSet {
ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await,
ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await,
ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await,
ConnSetCmd::IocAddrQueryResult(res) => {
for e in res {
if let Some(addr) = e.addr {
debug!("ioc found {e:?}");
let ch = Channel::new(e.channel.clone());
if let Some(chst) = self.channel_states.inner().get(&ch) {
if let ChannelStateValue::Active(ast) = &chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} = ast
{
let add = ChannelAddWithAddr {
backend: self.backend.clone(),
name: e.channel,
addr: SocketAddr::V4(addr),
cssid: status_series_id.clone(),
local_epics_hostname: self.local_epics_hostname.clone(),
};
} else {
warn!("TODO got address but no longer active");
}
} else {
warn!("TODO got address but no longer active");
}
} else {
warn!("ioc addr lookup done but channel no longer here");
}
}
}
Ok(())
}
ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
ConnSetCmd::CheckHealth => {
error!("TODO implement check health");
Ok(())
@@ -354,7 +323,7 @@ impl CaConnSet {
*chst2 = ActiveChannelState::WithStatusSeriesId {
status_series_id: add.cssid,
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::NoAddress {
inner: WithStatusSeriesIdStateInner::SearchPending {
since: SystemTime::now(),
},
},
@@ -384,6 +353,51 @@ impl CaConnSet {
Ok(())
}
async fn handle_ioc_query_result(&mut self, res: VecDeque<FindIocRes>) -> Result<(), Error> {
for e in res {
let ch = Channel::new(e.channel.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} = ast
{
if let Some(addr) = e.addr {
debug!("ioc found {e:?}");
let add = ChannelAddWithAddr {
backend: self.backend.clone(),
name: e.channel,
addr: SocketAddr::V4(addr),
cssid: status_series_id.clone(),
local_epics_hostname: self.local_epics_hostname.clone(),
};
self.connset_tx
.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithAddr(add)))
.await?;
let since = SystemTime::now();
state.inner = WithStatusSeriesIdStateInner::WithAddress {
addr,
state: WithAddressState::Unassigned { since },
}
} else {
debug!("ioc not found {e:?}");
let since = SystemTime::now();
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since };
}
} else {
warn!("TODO got address but no longer active");
}
} else {
warn!("TODO got address but no longer active");
}
} else {
warn!("ioc addr lookup done but channel no longer here");
}
}
Ok(())
}
fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result<CaConnRes, Error> {
// TODO should we save this as event?
let opts = CaConnOpts::default();
@@ -629,15 +643,12 @@ impl CaConnSet {
//info!("UnknownAddress {} {:?}", i, ch);
if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX {
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::SearchPending {
since: tsnow,
did_send: false,
};
state.inner = WithStatusSeriesIdStateInner::SearchPending { since: tsnow };
SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
}
}
}
WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => {
WithStatusSeriesIdStateInner::SearchPending { since } => {
//info!("SearchPending {} {:?}", i, ch);
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
@@ -650,7 +661,7 @@ impl CaConnSet {
//info!("WithAddress {} {:?}", i, ch);
use WithAddressState::*;
match state {
Unassigned { assign_at } => {
Unassigned { since } => {
// TODO do I need this case anymore?
#[cfg(DISABLED)]
if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow {
@@ -716,59 +727,51 @@ impl CaConnSet {
}
fn update_channel_state_counts(&mut self) -> (u64,) {
let mut unknown_address_count = 0;
let mut search_pending_count = 0;
let mut search_pending_did_send_count = 0;
let mut unassigned_count = 0;
let mut assigned_count = 0;
let mut no_address_count = 0;
let mut unknown_address = 0;
let mut search_pending = 0;
let mut unassigned = 0;
let mut assigned = 0;
let mut no_address = 0;
for (_ch, st) in self.channel_states.inner().iter() {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {
unknown_address_count += 1;
unknown_address += 1;
}
ActiveChannelState::WaitForStatusSeriesId { .. } => {
unknown_address_count += 1;
unknown_address += 1;
}
ActiveChannelState::WithStatusSeriesId { state, .. } => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
unknown_address_count += 1;
unknown_address += 1;
}
WithStatusSeriesIdStateInner::SearchPending { did_send, .. } => {
if *did_send {
search_pending_did_send_count += 1;
} else {
search_pending_count += 1;
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
search_pending += 1;
}
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
WithAddressState::Unassigned { .. } => {
unassigned_count += 1;
unassigned += 1;
}
WithAddressState::Assigned(_) => {
assigned_count += 1;
assigned += 1;
}
},
WithStatusSeriesIdStateInner::NoAddress { .. } => {
no_address_count += 1;
no_address += 1;
}
},
},
ChannelStateValue::ToRemove { .. } => {
unknown_address_count += 1;
unknown_address += 1;
}
}
}
use atomic::Ordering::Release;
self.stats.channel_unknown_address.store(unknown_address_count, Release);
self.stats.channel_search_pending.store(search_pending_count, Release);
self.stats
.search_pending_did_send
.store(search_pending_did_send_count, Release);
self.stats.unassigned.store(unassigned_count, Release);
self.stats.assigned.store(assigned_count, Release);
self.stats.channel_no_address.store(no_address_count, Release);
(search_pending_count,)
self.stats.channel_unknown_address.store(unknown_address, Release);
self.stats.channel_search_pending.store(search_pending, Release);
self.stats.channel_no_address.store(no_address, Release);
self.stats.channel_unassigned.store(unassigned, Release);
self.stats.channel_assigned.store(assigned, Release);
(search_pending,)
}
}

View File

@@ -53,9 +53,6 @@ macro_rules! trace_batch {
});
}
#[derive(Debug)]
pub struct IocAddrQueryResult {}
fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
let mut ret = VecDeque::new();
for row in rows {

View File

@@ -53,7 +53,7 @@ pub struct ConnectionState {
pub enum WithAddressState {
Unassigned {
//#[serde(with = "serde_Instant")]
assign_at: SystemTime,
since: SystemTime,
},
Assigned(ConnectionState),
}
@@ -66,7 +66,6 @@ pub enum WithStatusSeriesIdStateInner {
SearchPending {
//#[serde(with = "serde_Instant")]
since: SystemTime,
did_send: bool,
},
WithAddress {
addr: SocketAddrV4,