fix(pandabox): simplify enum states

This commit is contained in:
2026-01-13 11:48:12 +01:00
parent 7271de1654
commit bd8b3829dc

View File

@@ -32,7 +32,7 @@ import pandablocks.commands as pbc
from bec_lib import bec_logger
from ophyd.status import WaitTimeoutError
from pandablocks.blocking import BlockingClient
from pandablocks.responses import EndData, FrameData, ReadyData, StartData
from pandablocks.responses import Data, EndData, FrameData, ReadyData, StartData
from ophyd_devices import PSIDeviceBase, StatusBase
@@ -108,12 +108,11 @@ def load_layout_from_file_to_panda(host: str, file_path: str) -> None:
########################
class PandaDataEvents(StrEnum):
class PandaState(StrEnum):
"""
Events from the PandaBox data stream. The events READY, START, FRAME, END correspond to
actual data frames received from the PandaBox. The DISARMED event is used to indicate
that the PandaBox has been disarmed, either after a complete data acquisition or after
an interrupted acquisition.
States from the PandaBox data stream. The state READY, START, FRAME, END correspond to
actual data frames received from the PandaBox. DISARMED indicates that the PandaBox
has been disarmed and is no longer acquiring data.
"""
READY = "ready"
@@ -122,14 +121,19 @@ class PandaDataEvents(StrEnum):
END = "end"
DISARMED = "disarmed"
def describe(self) -> str:
"""Return a human-readable description of the event."""
descriptions = {
PandaState.READY: "PandaBox is ready for data acquisition.",
PandaState.START: "PandaBox has started data acquisition.",
PandaState.FRAME: "PandaBox has sent a frame of data.",
PandaState.END: "PandaBox has ended data acquisition.",
PandaState.DISARMED: "PandaBox is disarmed and not acquiring data. This event is not triggered by a data frame from the PandaBox.",
}
return descriptions.get(self, "Unknown PandaBox data event.")
LITERAL_PANDA_DATA_EVENTS: TypeAlias = Union[
PandaDataEvents.READY.value,
PandaDataEvents.START.value,
PandaDataEvents.FRAME.value,
PandaDataEvents.END.value,
]
# pylint: disable=invalid-name
LITERAL_PANDA_COMMANDS: TypeAlias = Union[
pbc.Raw,
pbc.Arm,
@@ -139,7 +143,6 @@ LITERAL_PANDA_COMMANDS: TypeAlias = Union[
pbc.GetFieldInfo,
pbc.GetPcapBitsLabels,
]
LITERAL_PANDA_DATA: TypeAlias = Union[ReadyData, StartData, FrameData, EndData]
@@ -151,12 +154,7 @@ class PandaBox(PSIDeviceBase):
"""
USER_ACCESS = [
"send_raw",
"add_status_callback",
"remove_status_callback",
"get_panda_data_state",
]
USER_ACCESS = ["send_raw", "add_status_callback", "remove_status_callback", "get_panda_state"]
def __init__(
self,
@@ -172,7 +170,7 @@ class PandaBox(PSIDeviceBase):
# Lock
self._lock = threading.RLock()
self._panda_data_state: PandaDataEvents | str = PandaDataEvents.DISARMED.value
self._panda_state: PandaState | str = PandaState.DISARMED.value
# Status callback management
self._status_callbacks: dict[uuid.UUID, dict[str, Any]] = {}
@@ -215,25 +213,25 @@ class PandaBox(PSIDeviceBase):
def add_status_callback(
self,
status: StatusBase,
success: list[PandaDataEvents],
failure: list[PandaDataEvents],
success: list[PandaState],
failure: list[PandaState],
check_directly: bool = True,
) -> str:
"""
This methods registers a status callback to the data receiving loop that will resolve
if the PandaBox receives specific data events. It is used to allow asynchronous resolution
of status objects based on PandaBox events. Per default, the callback checks the current
panda_data_state directly to see if the status can be resolved immediately. This is useful
panda_state directly to see if the status can be resolved immediately. This is useful
when the status is created after the PandaBox has already sent some data events. However, this
can also be disabled by setting check_directly to False.
Args:
status (StatusBase): The status object to register the callback for.
success (list[PandaDataEvents]): The list of PandaBox data events that will resolve
success (list[PandaState]): The list of PandaBox data events that will resolve
the status as successful.
failure (list[PandaDataEvents]): The list of PandaBox data events that will resolve
failure (list[PandaState]): The list of PandaBox data events that will resolve
the status as failed.
check_directly (bool): Whether to check the current panda_data_state directly
check_directly (bool): Whether to check the current panda_state directly
to resolve the status immediately. Defaults to True.
Returns:
@@ -242,7 +240,7 @@ class PandaBox(PSIDeviceBase):
"""
with self._lock:
if check_directly:
current_state = self.panda_data_state
current_state = self.panda_state
if current_state in success and not status.done:
status.set_finished()
return ""
@@ -275,7 +273,7 @@ class PandaBox(PSIDeviceBase):
def add_data_callback(
self,
callback: Callable[[LITERAL_PANDA_DATA], None],
data_type: LITERAL_PANDA_DATA_EVENTS = PandaDataEvents.FRAME.value,
data_type: PandaState = PandaState.FRAME.value,
) -> str:
"""
Register a data callback to be called whenever new data is received from the PandaBox.
@@ -304,28 +302,28 @@ class PandaBox(PSIDeviceBase):
with self._lock:
self._data_callbacks.pop(cb_id, None)
def get_panda_data_state(self) -> str:
def get_panda_state(self) -> str:
"""Get current panda data state."""
return self.panda_data_state
return self.panda_state
#########################
### State management ###
#########################
@property
def panda_data_state(self) -> str:
def panda_state(self) -> str:
"""Get the current state of the data acquisition on the PandaBox."""
return (
self._panda_data_state.value
if isinstance(self._panda_data_state, PandaDataEvents)
else self._panda_data_state
self._panda_state.value
if isinstance(self._panda_state, PandaState)
else self._panda_state
)
@panda_data_state.setter
def panda_data_state(self, value: PandaDataEvents | str) -> None:
@panda_state.setter
def panda_state(self, value: PandaState | str) -> None:
"""Set the current state of the data acquisition on the PandaBox."""
with self._lock:
self._panda_data_state = value
self._panda_state = value
################################
### Data readout management ###
@@ -353,7 +351,7 @@ class PandaBox(PSIDeviceBase):
- FrameData: Contains a frame of data acquired from the PandaBox.
- EndData: Indicates the end of a data acquisition.
Upon receiving each type of data message, the panda_data_state is updated accordingly,
Upon receiving each type of data message, the panda_state is updated accordingly,
and any registered callbacks for that event are executed. This allows to handle callbacks
for each stage of the data acquisition process. For example, a child class could add a
status callback to resolve during a specific stage of the data acquisition based on an
@@ -370,20 +368,20 @@ class PandaBox(PSIDeviceBase):
try:
for data in client.data(scaled=False):
if isinstance(data, ReadyData):
self._run_status_callbacks(PandaDataEvents.READY)
self._run_data_callbacks(data, PandaDataEvents.READY.value)
self._run_status_callbacks(PandaState.READY)
self._run_data_callbacks(data, PandaState.READY.value)
elif isinstance(data, StartData):
self._run_status_callbacks(PandaDataEvents.START)
self._run_data_callbacks(data, PandaDataEvents.START.value)
self._run_status_callbacks(PandaState.START)
self._run_data_callbacks(data, PandaState.START.value)
elif isinstance(data, FrameData):
self._run_status_callbacks(PandaDataEvents.FRAME)
self._run_data_callbacks(data, PandaDataEvents.FRAME.value)
self._run_status_callbacks(PandaState.FRAME)
self._run_data_callbacks(data, PandaState.FRAME.value)
elif isinstance(data, EndData):
self._run_status_callbacks(PandaDataEvents.END)
self._run_data_callbacks(data, PandaDataEvents.END.value)
self._run_status_callbacks(PandaState.END)
self._run_data_callbacks(data, PandaState.END.value)
break # Exit data readout loop
finally:
@@ -396,11 +394,19 @@ class PandaBox(PSIDeviceBase):
# expected safe state of the data receiving loop from the PandaBox and was added
# in addition to the existing READY, START, FRAME, END events created from the existing
# PandaBox data messages.
client.send(self._disarm()) # Ensure we disarm at the end
self.data_thread_run_event.clear()
self._run_status_callbacks(PandaDataEvents.DISARMED)
def _run_status_callbacks(self, event: PandaDataEvents) -> None:
client.send(self._disarm()) # Ensure we disarm at the end
self.data_thread_run_event.clear() # Stop data readout loop
self._run_status_callbacks(PandaState.DISARMED) # Run DISARMED status callbacks
# As DISARMED is not triggered by a data message, we manually run data callbacks for it here
# and run it with an empty Data() object following the base class for data message responses
# of the pandablocks library.
self._run_data_callbacks(Data(), PandaState.DISARMED.value)
def _run_status_callbacks(self, event: PandaState) -> None:
"""
Run registered status callbacks for a given PandaBox data event.
These callbacks are used to resolve status objects that are registered
@@ -411,16 +417,16 @@ class PandaBox(PSIDeviceBase):
NOTE : Status callbacks are removed once they are resolved (either success or failure).
Args:
event (PandaDataEvents): The PandaBox data event that occurred.
event (PandaState): The PandaBox data event that occurred.
data (LITERAL_PANDA_DATA): The data associated with the event.
"""
self.panda_data_state = event
self.panda_state = event
with self._lock:
callbacks_to_remove = []
for cb_id, cb_info in self._status_callbacks.items():
status: StatusBase = cb_info["status"]
success_events: list[PandaDataEvents] = cb_info["success"]
failure_events: list[PandaDataEvents] = cb_info["failure"]
success_events: list[PandaState] = cb_info["success"]
failure_events: list[PandaState] = cb_info["failure"]
if event in success_events and not status.done:
status.set_finished()
@@ -436,9 +442,7 @@ class PandaBox(PSIDeviceBase):
for cb_id in callbacks_to_remove:
self._status_callbacks.pop(cb_id, None)
def _run_data_callbacks(
self, data: LITERAL_PANDA_DATA, event_type: LITERAL_PANDA_DATA_EVENTS
) -> None:
def _run_data_callbacks(self, data: LITERAL_PANDA_DATA, event_type: PandaState) -> None:
"""
Placeholder method to run data callbacks for received PandaBox data.
Child classes can override this method to implement custom behavior
@@ -448,13 +452,13 @@ class PandaBox(PSIDeviceBase):
Args:
data (LITERAL_PANDA_DATA): The data received from the PandaBox.
event_type (LITERAL_PANDA_DATA_EVENTS): The type of data received. This can be
event_type (PandaState): The type of data received. This can be
"ready", "start", "frame", or "end".
"""
with self._lock:
for cb_info in self._data_callbacks.values():
callback: Callable[[LITERAL_PANDA_DATA], None] = cb_info["callback"]
cb_data_type: LITERAL_PANDA_DATA_EVENTS = cb_info["data_type"]
cb_data_type: PandaState = cb_info["data_type"]
if cb_data_type == event_type:
callback(data)
@@ -518,7 +522,7 @@ class PandaBox(PSIDeviceBase):
"""
# First make sure that the data readout loop is not running
status = StatusBase(obj=self)
self.add_status_callback(status=status, success=[PandaDataEvents.DISARMED], failure=[])
self.add_status_callback(status=status, success=[PandaState.DISARMED], failure=[])
try:
status.wait(timeout=3)
except WaitTimeoutError:
@@ -542,8 +546,8 @@ class PandaBox(PSIDeviceBase):
status_ready_data_received = StatusBase(obj=self)
self.add_status_callback(
status=status_ready_data_received,
success=[PandaDataEvents.READY],
failure=[PandaDataEvents.FRAME, PandaDataEvents.END],
success=[PandaState.READY],
failure=[PandaState.FRAME, PandaState.END],
)
status_ready_data_received.add_callback(self._pre_scan_status_callback)
if status: