1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-04 16:02:51 +01:00

refactor: rename to beamline states

This commit is contained in:
2026-02-05 09:45:19 +01:00
parent d0d14ae78f
commit 2cbe2a4542

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messages import BeamlineConditionUpdateEntry
from bec_lib.messages import BeamlineStateConfig
from qtpy.QtCore import QObject, QTimer, Signal
from bec_widgets.utils.bec_connector import BECConnector
@@ -14,12 +14,12 @@ logger = bec_logger.logger
class BECStatusBroker(BECConnector, QObject):
"""Listen to BEC beamline condition endpoints and emit structured signals."""
"""Listen to BEC beamline state endpoints and emit structured signals."""
_instance: "BECStatusBroker | None" = None
_initialized: bool = False
available_updated = Signal(list) # list of conditions available
available_updated = Signal(list) # list of states available
status_updated = Signal(str, dict) # name, status update
def __new__(cls, *args, **kwargs):
@@ -33,7 +33,7 @@ class BECStatusBroker(BECConnector, QObject):
super().__init__(parent=parent, gui_id=gui_id, client=client, **kwargs)
self._watched: set[str] = set()
self.bec_dispatcher.connect_slot(
self.on_available, MessageEndpoints.available_beamline_conditions()
self.on_available, MessageEndpoints.available_beamline_states()
)
self._initialized = True
@@ -42,7 +42,7 @@ class BECStatusBroker(BECConnector, QObject):
def refresh_available(self):
"""Fetch the current set of beamline conditions once."""
try:
msg = self.client.connector.get_last(MessageEndpoints.available_beamline_conditions())
msg = self.client.connector.get_last(MessageEndpoints.available_beamline_states())
logger.info(f"StatusBroker: fetched available conditions payload: {msg}")
if msg:
self.on_available(msg.get("data").content, None)
@@ -51,40 +51,40 @@ class BECStatusBroker(BECConnector, QObject):
@SafeSlot(dict, dict)
def on_available(self, data: dict, meta: dict | None = None):
condition_list = data.get("conditions") # latest one from the stream
self.available_updated.emit(condition_list)
for condition in condition_list:
name = condition.name
state_list = data.get("states") # latest one from the stream
self.available_updated.emit(state_list)
for state in state_list:
name = state.name
if name:
self.watch_condition(name)
self.watch_state(name)
def watch_condition(self, name: str):
"""Subscribe to updates for a single beamline condition."""
def watch_state(self, name: str):
"""Subscribe to updates for a single beamline state."""
if name in self._watched:
return
self._watched.add(name)
endpoint = MessageEndpoints.beamline_condition(name)
logger.info(f"StatusBroker: watching condition '{name}' on {endpoint.endpoint}")
self.bec_dispatcher.connect_slot(self.on_condition, endpoint)
self.fetch_condition(name)
endpoint = MessageEndpoints.beamline_state(name)
logger.info(f"StatusBroker: watching state '{name}' on {endpoint.endpoint}")
self.bec_dispatcher.connect_slot(self.on_state, endpoint)
self.fetch_state(name)
def fetch_condition(self, name: str):
"""Fetch the current value of a beamline condition once."""
endpoint = MessageEndpoints.beamline_condition(name)
def fetch_state(self, name: str):
"""Fetch the current value of a beamline state once."""
endpoint = MessageEndpoints.beamline_state(name)
try:
msg = self.client.connector.get_last(endpoint)
logger.info(f"StatusBroker: fetched condition '{name}' payload: {msg}")
logger.info(f"StatusBroker: fetched state '{name}' payload: {msg}")
if msg:
self.on_condition(msg.get("data").content, None)
self.on_state(msg.get("data").content, None)
except Exception as exc: # pragma: no cover - runtime env
logger.debug(f"Could not fetch condition {name}: {exc}")
logger.debug(f"Could not fetch state {name}: {exc}")
@SafeSlot(dict, dict)
def on_condition(self, data: dict, meta: dict | None = None):
def on_state(self, data: dict, meta: dict | None = None):
name = data.get("name")
if not name:
return
logger.info(f"StatusBroker: condition update for '{name}' -> {data}")
logger.info(f"StatusBroker: state update for '{name}' -> {data}")
self.status_updated.emit(str(name), data)
@classmethod
@@ -97,12 +97,12 @@ class BECStatusBroker(BECConnector, QObject):
class StatusToolBar(ModularToolBar):
"""Status toolbar that auto-manages beamline condition indicators."""
"""Status toolbar that auto-manages beamline state indicators."""
STATUS_MAP: dict[str, StatusState] = {
"normal": StatusState.SUCCESS,
"valid": StatusState.SUCCESS,
"warning": StatusState.WARNING,
"alarm": StatusState.EMERGENCY,
"invalid": StatusState.EMERGENCY,
}
def __init__(self, parent=None, names: list[str] | None = None, **kwargs):
@@ -132,7 +132,7 @@ class StatusToolBar(ModularToolBar):
self.add_status_item(
name=name, text=name, state=StatusState.DEFAULT, tooltip=None
)
self.broker.watch_condition(name)
self.broker.watch_state(name)
def _apply_status_toolbar_style(self) -> None:
self.setStyleSheet(
@@ -145,19 +145,19 @@ class StatusToolBar(ModularToolBar):
# -------- Slots for updates --------
@SafeSlot(list)
def on_available_updated(self, available_conditions: list):
"""Process the available conditions stream and start watching them."""
def on_available_updated(self, available_states: list):
"""Process the available states stream and start watching them."""
# Keep track of current names from the broker to remove stale ones.
current_names: set[str] = set()
for condition in available_conditions:
if not isinstance(condition, BeamlineConditionUpdateEntry):
for state in available_states:
if not isinstance(state, BeamlineStateConfig):
continue
name = condition.name
title = condition.title or name
name = state.name
title = state.title or name
if not name:
continue
current_names.add(name)
logger.info(f"StatusToolbar: discovered condition '{name}' title='{title}'")
logger.info(f"StatusToolbar: discovered state '{name}' title='{title}'")
# auto-add unless filtered out
if self.allowed_names is None or name in self.allowed_names:
self.add_status_item(name=name, text=title, state=StatusState.DEFAULT, tooltip=None)
@@ -168,22 +168,22 @@ class StatusToolBar(ModularToolBar):
if act and act.action:
act.action.setVisible(False)
# Remove actions that are no longer present in available_conditions.
# Remove actions that are no longer present in available_states.
known_actions = [
n for n in self.components._components.keys() if n not in ("separator",)
] # direct access used for clean-up
for name in known_actions:
if name not in current_names:
logger.info(f"StatusToolbar: removing stale condition '{name}'")
logger.info(f"StatusToolbar: removing stale state '{name}'")
try:
self.components.remove_action(name)
except Exception as exc:
logger.warning(f"Failed to remove stale condition '{name}': {exc}")
logger.warning(f"Failed to remove stale state '{name}': {exc}")
self.refresh()
@SafeSlot(str, dict)
def on_status_updated(self, name: str, payload: dict): # TODO finish update logic
"""Update a status pill when a condition update arrives."""
"""Update a status pill when a state update arrives."""
state = self.STATUS_MAP.get(str(payload.get("status", "")).lower(), StatusState.DEFAULT)
action = self.components.get_action(name) if self.components.exists(name) else None
@@ -193,12 +193,12 @@ class StatusToolBar(ModularToolBar):
if text is None and action is None:
text = payload.get("name") or name
if "message" in payload:
tooltip = payload.get("message") or ""
if "label" in payload:
tooltip = payload.get("label") or ""
else:
tooltip = None
logger.info(
f"StatusToolbar: update condition '{name}' -> state={state} text='{text}' tooltip='{tooltip}'"
f"StatusToolbar: update state '{name}' -> state={state} text='{text}' tooltip='{tooltip}'"
)
self.set_status(name=name, text=text, state=state, tooltip=tooltip)
@@ -249,7 +249,7 @@ class StatusToolBar(ModularToolBar):
self.components.add_safe(name, action)
self.get_bundle("status").add_action(name)
self.refresh()
self.broker.fetch_condition(name)
self.broker.fetch_state(name)
return action
def set_status(