From a3e27419afb0e8515667edf887e5fdc516022abf Mon Sep 17 00:00:00 2001 From: appel_c Date: Mon, 16 Feb 2026 21:30:58 +0100 Subject: [PATCH] fix(pandabox): improve docs and readme --- ophyd_devices/devices/panda_box/README.md | 6 +- ophyd_devices/devices/panda_box/panda_box.py | 106 ++++++++----------- 2 files changed, 49 insertions(+), 63 deletions(-) diff --git a/ophyd_devices/devices/panda_box/README.md b/ophyd_devices/devices/panda_box/README.md index 36cce9a..436d1c3 100644 --- a/ophyd_devices/devices/panda_box/README.md +++ b/ophyd_devices/devices/panda_box/README.md @@ -5,11 +5,11 @@ Short Doumentation of the PandaBox Device Integration in Ophyd Devices The PandaBox integration provides a base class for interfacing with PandaBox hardware from Diamond Light Source. This implementation wraps the PandaBox hardware as a `PSIDeviceBase` device, integrating it into the BEC scan interface. It uses the `pandablocks` library for communication and data acquisition. Beamline-specific implementations should use the *on_hook* methods from PSIDeviceBase to implement custom logic. -**IMPORTANT** : If the `on_connected()` method is overridden by a child class, it must always call `super().on_connected()` first to ensure proper initialization of the PandaBox device. This is implemented in the [PandaBox class](./panda_box.py). +**IMPORTANT** : If the `on_connected()` method is overridden by a child class, it must always call `super().on_connected()` first to ensure proper initialization of the PandaBox device. This is implemented in the [PandaBox class](./panda_box.py). The same is true for all the other *on_hook* methods, which may contain important logic for the proper setup for the scan. Only skip them if you are sure this logic is not required. ### PandaState -The PandaBox has a PCAP module that can be used to record block values. The base integration implements logic that automatically arms/disarms the PCAP module for the BEC scan interface. Callbacks can be attached for status handlings *add_status_callback* and data handling *add_data_callback* respectively. Below is the enum defining the various PandaBox states: +The PandaBox has a PCAP module that can be used to record block values. The base integration implements logic that automatically arms/disarms the PCAP module for the BEC scan interface. Callbacks can be attached for handling status updates *add_status_callback* and data handling *add_data_callback* respectively. Below is the enum defining the various PandaBox states: ```python class PandaState(StrEnum): @@ -33,7 +33,7 @@ These methods include: ### Other useful methods -- `_compile_frame_data_to_dict(frame_data: FrameData, signal_name_key_mapping: dict[str, str] | None = None) -> dict[str, Any]` : Convert FrameData from PandaBox into a dictionary format compatible with Ophyd signals. Optionally map PandaBox signal names to custom signal names. +- `convert_frame_data(frame_data: FrameData, signal_name_key_mapping: dict[str, str] | None = None) -> dict[str, Any]` : Convert FrameData from PandaBox into a dictionary format compatible with Ophyd signals. Optionally map PandaBox signal names to custom signal names. - `_get_signal_names_allowed_for_capture() -> list[str]` : Get a list of all signal keys that can be configured for capture on the PandaBox. - `_get_signal_names_configured_for_capture() -> list[str]` : Get a list of all signal keys that are currently configured for capture on the PandaBox. diff --git a/ophyd_devices/devices/panda_box/panda_box.py b/ophyd_devices/devices/panda_box/panda_box.py index 3e0ecfd..93ba8e9 100644 --- a/ophyd_devices/devices/panda_box/panda_box.py +++ b/ophyd_devices/devices/panda_box/panda_box.py @@ -7,18 +7,27 @@ It wraps a couple of the methods from the scan interface (stage, unstage, pre_sc with PandaBox specific logic to manage the data acquisition and communication with the hardware. Any beamline integration should inherit from this base class and integrate their specific logic -into the on_connected, stage, unstage, pre_scan, kickoff, complete methods as needed. Please -be aware that the on_connected method is wrapped in here and should therefore always call -super().on_connected(). Child integrations should register data callbacks to handle incoming -data from the PandaBox during acquisition, and set the respective data on their ophyd signals. -The utility method convert_frame_data can be used to convert FrameData objects received -to the expected signal dict format. Please be aware that naming conventions here map to the -names of the blocks, and should be mapped to the beamline specific signal names in the child class. +into the on_connected, on_stage, on_unstage, on_pre_scan, on_complete methods as needed. -More generally, a beamline specific integration needs to imlement logic attached to specific -layouts that change dynamically during scans. More importantly, it also needs to make sure that -the correct layout is loaded. Utility methods to load/save layouts to/from files or directly -from/to the PandaBox hardware are provided in this class too. +Please note that the super().on_.. methods should be called to ensure proper initialization +and cleanup. You should only avoid calling the super() methods if you are certain that this +does not jeopardize the proper setup of the PandaBox and PCAP module for data acquisition. + +Example: + +def on_pre_scan(self): + # Custom logic before staging, e.g. checking some conditions or setting up some parameters + return super().on_pre_scan() # Make sure to call return super().on_pre_scan() and return this + status object + +The base integration also includes a data signal with all available PCAP block signals. We allow +children classes to provide signal_aliases during the initialization to rename signals from the +PandaBox to better suited names for the beamline. We recommend to keep these names consistent +to allow for long-term maintainability and their storage in the data files/base. + +Besides the integration, we also provide certain utility methods to directly load/save layouts +from/to the PandaBox hardware or from/to local files. This allows to easily manage the layouts +required for beamline operation. """ from __future__ import annotations @@ -43,6 +52,7 @@ from ophyd_devices.devices.panda_box.utils import get_pcap_capture_fields if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo from bec_server.device_server.devices.devicemanager import DeviceManagerDS + from ophyd import StatusBase as OphydStatusBase logger = bec_logger.logger @@ -235,6 +245,9 @@ class PandaBox(PSIDeviceBase): """ Send a raw command to the PandaBox. This can be used to set for example values on PandaBox block fields directly, e.g. 'BITS.B=1' to set the BITS.B field to 1. + Please note, list of raw commands are not allowed as they have to sent sequentially. + The list[str] input is needed to certain commands that require this syntax, e.g. + ["SEQ1.TABLE>", "1", "1", "0", "0", ""] Args: raw_command (str | list[str]): The raw command to send to the PandaBox. We can also send @@ -247,8 +260,8 @@ class PandaBox(PSIDeviceBase): Notes: Other useful raw commands: - 'BITS.B=1' or similar once to set bit fields - - ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once - '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check + """ if isinstance(raw_command, str): raw_command = [raw_command] @@ -515,6 +528,10 @@ class PandaBox(PSIDeviceBase): # or re-evaluate the implemented logic as these methods attempt to partially # setup the PandaBox for data acquisition. + def wait_for_connection(self, timouet: float | None = None) -> bool: + ret = self.send_raw("*IDN?") + return True + def on_connected(self): """ Here we start the data readout thread upon connection to the PandaBox device. @@ -522,11 +539,6 @@ class PandaBox(PSIDeviceBase): connection logic from child classes is executed first. """ # Test connection by sending WHO command which should respond with PandaBox ID - try: - ret = self.send_raw("*IDN?") - except Exception as e: - logger.error(f"Could not connect to PandaBox {self.name} at host {self.host}: {e}") - raise e from e super().on_connected() self.data_thread.start() self.add_data_callback(data_type=PandaState.FRAME, callback=self._receive_frame_data) @@ -544,6 +556,7 @@ class PandaBox(PSIDeviceBase): is disarmed before any additional stopping logic from child classes is executed. """ self._disarm() + self.on_stop() super().stop(success=success) def destroy(self): @@ -555,26 +568,11 @@ class PandaBox(PSIDeviceBase): """ self.data_thread_kill_event.set() # Signal thread to exit self.data_thread_run_event.set() # Unblock thread if waiting + self.on_destroy() super().destroy() - def stage(self) -> list[object] | StatusBase: - """ - Stage the PandaBox device for acquisition. - We make sure that the PandaBox data state is DISARMED before staging, - to ensure that we do not start an acquisition while the PandaBox is still running an acquisition. - This should never hapen in a well-integrated scan interface, but we add this check - to be safe. - - Then we call super().stage() to perform any additional staging logic from child classes. - Finally, we start the data readout loop by setting the data_thread_run_event. - - Returns: - list[object] | StatusBase: The result of the super().stage() call. - """ - # First make sure that the data readout loop is not running - if self.staged != Staged.no: - return super().stage() - self.stopped = False + def on_stage(self) -> StatusBase | OphydStatusBase | None: + """On stage hook for the PandaBox. Here we make sure that the PandaBox is disarmed before staging.""" status = StatusBase(obj=self) self.add_status_callback(status=status, success=[PandaState.DISARMED], failure=[]) try: @@ -585,40 +583,29 @@ class PandaBox(PSIDeviceBase): raise RuntimeError( f"PandaBox {self.name} did not disarm properly. Please check the connection and the device integration." ) - - ret = super().stage() self.data_thread_run_event.set() # Start data readout loop - return ret + return super().on_stage() - def pre_scan(self) -> StatusBase: + def on_pre_scan(self) -> StatusBase | OphydStatusBase | None: """ - Prepare the PandaBox device for scan acquisition. It is important here that - the PCAP module is armed for acquisition only after the data readout loop is started. - We therefore add the logic here after any additional pre_scan logic from child classes. + On pre_scan hook for the PandaBox. We use this hook to arm the PCAP module for data acquisition. + This logic makes sure that the data readout loop is started and that we received the READY event + from the device. Only then can the PCAP module aquire data. """ - status = super().pre_scan() - status_ready_data_received = StatusBase(obj=self) - status_ready_data_received.add_callback(self._pre_scan_status_callback) + status = StatusBase(obj=self) + status.add_callback(self._pre_scan_status_callback) self.add_status_callback( - status=status_ready_data_received, - success=[PandaState.READY], - failure=[PandaState.FRAME, PandaState.END], + status=status, success=[PandaState.READY], failure=[PandaState.FRAME, PandaState.END] ) + self.cancel_on_stop(status) # Make sure status is cancelled if externally stopped + return status - if status: - ret_status = status_ready_data_received & status - else: - ret_status = status_ready_data_received - self.cancel_on_stop(ret_status) # Make sure status is cancelled if externally stopped - return ret_status - - def unstage(self) -> list[object] | StatusBase: + def on_unstage(self) -> list[object] | StatusBase | OphydStatusBase: """ Any unstaging of the PandaBox device should ensure that""" - ret = super().unstage() self.data_thread_run_event.clear() # Make sure that the data readout loop is stopped - self._disarm() # Disarm the PandaBox, should be idempotent (TODO to check) - return ret + self._disarm() # Disarm the PandaBox, should be idempotent + return super().on_unstage() ####################### ### Utility Methods ### @@ -627,7 +614,6 @@ class PandaBox(PSIDeviceBase): def _get_signal_names_allowed_for_capture(self) -> list[str]: """Utility method to get a list of all signal keys that CAN BE CONFIGURED for capture on the PandaBox.""" ret = self.send_raw("*CAPTURE.*?") - # TODO check proper unpacking of returned keys return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")] def _get_signal_names_configured_for_capture(self) -> list[str]: