fix(pandabox): improve docs and readme

This commit is contained in:
2026-02-16 21:30:58 +01:00
parent 016e830852
commit a3e27419af
2 changed files with 49 additions and 63 deletions

View File

@@ -5,11 +5,11 @@ Short Doumentation of the PandaBox Device Integration in Ophyd Devices
The PandaBox integration provides a base class for interfacing with PandaBox hardware from Diamond Light Source. This implementation wraps the PandaBox hardware as a `PSIDeviceBase` device, integrating it into the BEC scan interface. It uses the `pandablocks` library for communication and data acquisition. Beamline-specific implementations should use the *on_hook* methods from PSIDeviceBase to implement custom logic.
**IMPORTANT** : If the `on_connected()` method is overridden by a child class, it must always call `super().on_connected()` first to ensure proper initialization of the PandaBox device. This is implemented in the [PandaBox class](./panda_box.py).
**IMPORTANT** : If the `on_connected()` method is overridden by a child class, it must always call `super().on_connected()` first to ensure proper initialization of the PandaBox device. This is implemented in the [PandaBox class](./panda_box.py). The same is true for all the other *on_hook* methods, which may contain important logic for the proper setup for the scan. Only skip them if you are sure this logic is not required.
### PandaState
The PandaBox has a PCAP module that can be used to record block values. The base integration implements logic that automatically arms/disarms the PCAP module for the BEC scan interface. Callbacks can be attached for status handlings *add_status_callback* and data handling *add_data_callback* respectively. Below is the enum defining the various PandaBox states:
The PandaBox has a PCAP module that can be used to record block values. The base integration implements logic that automatically arms/disarms the PCAP module for the BEC scan interface. Callbacks can be attached for handling status updates *add_status_callback* and data handling *add_data_callback* respectively. Below is the enum defining the various PandaBox states:
```python
class PandaState(StrEnum):
@@ -33,7 +33,7 @@ These methods include:
### Other useful methods
- `_compile_frame_data_to_dict(frame_data: FrameData, signal_name_key_mapping: dict[str, str] | None = None) -> dict[str, Any]` : Convert FrameData from PandaBox into a dictionary format compatible with Ophyd signals. Optionally map PandaBox signal names to custom signal names.
- `convert_frame_data(frame_data: FrameData, signal_name_key_mapping: dict[str, str] | None = None) -> dict[str, Any]` : Convert FrameData from PandaBox into a dictionary format compatible with Ophyd signals. Optionally map PandaBox signal names to custom signal names.
- `_get_signal_names_allowed_for_capture() -> list[str]` : Get a list of all signal keys that can be configured for capture on the PandaBox.
- `_get_signal_names_configured_for_capture() -> list[str]` : Get a list of all signal keys that are currently configured for capture on the PandaBox.

View File

@@ -7,18 +7,27 @@ It wraps a couple of the methods from the scan interface (stage, unstage, pre_sc
with PandaBox specific logic to manage the data acquisition and communication with the hardware.
Any beamline integration should inherit from this base class and integrate their specific logic
into the on_connected, stage, unstage, pre_scan, kickoff, complete methods as needed. Please
be aware that the on_connected method is wrapped in here and should therefore always call
super().on_connected(). Child integrations should register data callbacks to handle incoming
data from the PandaBox during acquisition, and set the respective data on their ophyd signals.
The utility method convert_frame_data can be used to convert FrameData objects received
to the expected signal dict format. Please be aware that naming conventions here map to the
names of the blocks, and should be mapped to the beamline specific signal names in the child class.
into the on_connected, on_stage, on_unstage, on_pre_scan, on_complete methods as needed.
More generally, a beamline specific integration needs to imlement logic attached to specific
layouts that change dynamically during scans. More importantly, it also needs to make sure that
the correct layout is loaded. Utility methods to load/save layouts to/from files or directly
from/to the PandaBox hardware are provided in this class too.
Please note that the super().on_.. methods should be called to ensure proper initialization
and cleanup. You should only avoid calling the super() methods if you are certain that this
does not jeopardize the proper setup of the PandaBox and PCAP module for data acquisition.
Example:
def on_pre_scan(self):
# Custom logic before staging, e.g. checking some conditions or setting up some parameters
return super().on_pre_scan() # Make sure to call return super().on_pre_scan() and return this
status object
The base integration also includes a data signal with all available PCAP block signals. We allow
children classes to provide signal_aliases during the initialization to rename signals from the
PandaBox to better suited names for the beamline. We recommend to keep these names consistent
to allow for long-term maintainability and their storage in the data files/base.
Besides the integration, we also provide certain utility methods to directly load/save layouts
from/to the PandaBox hardware or from/to local files. This allows to easily manage the layouts
required for beamline operation.
"""
from __future__ import annotations
@@ -43,6 +52,7 @@ from ophyd_devices.devices.panda_box.utils import get_pcap_capture_fields
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import StatusBase as OphydStatusBase
logger = bec_logger.logger
@@ -235,6 +245,9 @@ class PandaBox(PSIDeviceBase):
"""
Send a raw command to the PandaBox. This can be used to set for example
values on PandaBox block fields directly, e.g. 'BITS.B=1' to set the BITS.B field to 1.
Please note, list of raw commands are not allowed as they have to sent sequentially.
The list[str] input is needed to certain commands that require this syntax, e.g.
["SEQ1.TABLE>", "1", "1", "0", "0", ""]
Args:
raw_command (str | list[str]): The raw command to send to the PandaBox. We can also send
@@ -247,8 +260,8 @@ class PandaBox(PSIDeviceBase):
Notes:
Other useful raw commands:
- 'BITS.B=1' or similar once to set bit fields
- ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once
- '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check
"""
if isinstance(raw_command, str):
raw_command = [raw_command]
@@ -515,6 +528,10 @@ class PandaBox(PSIDeviceBase):
# or re-evaluate the implemented logic as these methods attempt to partially
# setup the PandaBox for data acquisition.
def wait_for_connection(self, timouet: float | None = None) -> bool:
ret = self.send_raw("*IDN?")
return True
def on_connected(self):
"""
Here we start the data readout thread upon connection to the PandaBox device.
@@ -522,11 +539,6 @@ class PandaBox(PSIDeviceBase):
connection logic from child classes is executed first.
"""
# Test connection by sending WHO command which should respond with PandaBox ID
try:
ret = self.send_raw("*IDN?")
except Exception as e:
logger.error(f"Could not connect to PandaBox {self.name} at host {self.host}: {e}")
raise e from e
super().on_connected()
self.data_thread.start()
self.add_data_callback(data_type=PandaState.FRAME, callback=self._receive_frame_data)
@@ -544,6 +556,7 @@ class PandaBox(PSIDeviceBase):
is disarmed before any additional stopping logic from child classes is executed.
"""
self._disarm()
self.on_stop()
super().stop(success=success)
def destroy(self):
@@ -555,26 +568,11 @@ class PandaBox(PSIDeviceBase):
"""
self.data_thread_kill_event.set() # Signal thread to exit
self.data_thread_run_event.set() # Unblock thread if waiting
self.on_destroy()
super().destroy()
def stage(self) -> list[object] | StatusBase:
"""
Stage the PandaBox device for acquisition.
We make sure that the PandaBox data state is DISARMED before staging,
to ensure that we do not start an acquisition while the PandaBox is still running an acquisition.
This should never hapen in a well-integrated scan interface, but we add this check
to be safe.
Then we call super().stage() to perform any additional staging logic from child classes.
Finally, we start the data readout loop by setting the data_thread_run_event.
Returns:
list[object] | StatusBase: The result of the super().stage() call.
"""
# First make sure that the data readout loop is not running
if self.staged != Staged.no:
return super().stage()
self.stopped = False
def on_stage(self) -> StatusBase | OphydStatusBase | None:
"""On stage hook for the PandaBox. Here we make sure that the PandaBox is disarmed before staging."""
status = StatusBase(obj=self)
self.add_status_callback(status=status, success=[PandaState.DISARMED], failure=[])
try:
@@ -585,40 +583,29 @@ class PandaBox(PSIDeviceBase):
raise RuntimeError(
f"PandaBox {self.name} did not disarm properly. Please check the connection and the device integration."
)
ret = super().stage()
self.data_thread_run_event.set() # Start data readout loop
return ret
return super().on_stage()
def pre_scan(self) -> StatusBase:
def on_pre_scan(self) -> StatusBase | OphydStatusBase | None:
"""
Prepare the PandaBox device for scan acquisition. It is important here that
the PCAP module is armed for acquisition only after the data readout loop is started.
We therefore add the logic here after any additional pre_scan logic from child classes.
On pre_scan hook for the PandaBox. We use this hook to arm the PCAP module for data acquisition.
This logic makes sure that the data readout loop is started and that we received the READY event
from the device. Only then can the PCAP module aquire data.
"""
status = super().pre_scan()
status_ready_data_received = StatusBase(obj=self)
status_ready_data_received.add_callback(self._pre_scan_status_callback)
status = StatusBase(obj=self)
status.add_callback(self._pre_scan_status_callback)
self.add_status_callback(
status=status_ready_data_received,
success=[PandaState.READY],
failure=[PandaState.FRAME, PandaState.END],
status=status, success=[PandaState.READY], failure=[PandaState.FRAME, PandaState.END]
)
self.cancel_on_stop(status) # Make sure status is cancelled if externally stopped
return status
if status:
ret_status = status_ready_data_received & status
else:
ret_status = status_ready_data_received
self.cancel_on_stop(ret_status) # Make sure status is cancelled if externally stopped
return ret_status
def unstage(self) -> list[object] | StatusBase:
def on_unstage(self) -> list[object] | StatusBase | OphydStatusBase:
"""
Any unstaging of the PandaBox device should ensure that"""
ret = super().unstage()
self.data_thread_run_event.clear() # Make sure that the data readout loop is stopped
self._disarm() # Disarm the PandaBox, should be idempotent (TODO to check)
return ret
self._disarm() # Disarm the PandaBox, should be idempotent
return super().on_unstage()
#######################
### Utility Methods ###
@@ -627,7 +614,6 @@ class PandaBox(PSIDeviceBase):
def _get_signal_names_allowed_for_capture(self) -> list[str]:
"""Utility method to get a list of all signal keys that CAN BE CONFIGURED for capture on the PandaBox."""
ret = self.send_raw("*CAPTURE.*?")
# TODO check proper unpacking of returned keys
return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")]
def _get_signal_names_configured_for_capture(self) -> list[str]: