mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-02-20 09:18:42 +01:00
feat(pandabox): Add initial integration of the pandabox
This commit is contained in:
0
ophyd_devices/devices/pandabox/__init__.py
Normal file
0
ophyd_devices/devices/pandabox/__init__.py
Normal file
624
ophyd_devices/devices/pandabox/pandabox.py
Normal file
624
ophyd_devices/devices/pandabox/pandabox.py
Normal file
@@ -0,0 +1,624 @@
|
||||
"""
|
||||
Module to interface with a PandaBox device. It requires the 'host' of the PandaBox to
|
||||
be able to connect to it via the BlockingClient from the pandablocks library.
|
||||
|
||||
This module contains a base integration of the PandaBox hardware as a PSIDeviceBase device.
|
||||
It wraps a couple of the methods from the scan interface (stage, unstage, pre_scan, stop, destroy)
|
||||
with PandaBox specific logic to manage the data acquisition and communication with the hardware.
|
||||
|
||||
Any beamline integration should inherit from this base class and integrate their specific logic
|
||||
into the on_connected, stage, unstage, pre_scan, kickoff, complete methods as needed. Please
|
||||
be aware that the on_connected method is wrapped in here and should therefore always call
|
||||
super().on_connected(). Child integrations should register data callbacks to handle incoming
|
||||
data from the PandaBox during acquisition, and set the respective data on their ophyd signals.
|
||||
The utility method _compile_frame_data_to_dict can be used to convert FrameData objects received
|
||||
to the expected signal dict format. Please be aware that naming conventions here map to the
|
||||
names of the blocks, and should be mapped to the beamline specific signal names in the child class.
|
||||
|
||||
More generally, a beamline specific integration needs to imlement logic attached to specific
|
||||
layouts that change dynamically during scans. More importantly, it also needs to make sure that
|
||||
the correct layout is loaded. Utility methods to load/save layouts to/from files or directly
|
||||
from/to the PandaBox hardware are provided in this class too.
|
||||
"""
|
||||
|
||||
import os
|
||||
import threading
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, Any, Callable, TypeAlias, Union
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from ophyd.status import WaitTimeoutError
|
||||
from pandablocks.blocking import BlockingClient
|
||||
from pandablocks.commands import (
|
||||
Arm,
|
||||
Disarm,
|
||||
GetBlockInfo,
|
||||
GetChanges,
|
||||
GetFieldInfo,
|
||||
GetPcapBitsLabels,
|
||||
GetState,
|
||||
Raw,
|
||||
SetState,
|
||||
)
|
||||
from pandablocks.responses import EndData, FrameData, ReadyData, StartData
|
||||
|
||||
from ophyd_devices import PSIDeviceBase, StatusBase
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
##########################
|
||||
### Utility functions ###
|
||||
##########################
|
||||
|
||||
|
||||
def load_layout_from_panda(host: str) -> list[str]:
|
||||
"""Load the current layout from the PandaBox.
|
||||
|
||||
Args:
|
||||
host (str): The hostname of the PandaBox.
|
||||
|
||||
Returns:
|
||||
list[str]: The current layout of the PandaBox device. Please check module dockstring for more info
|
||||
|
||||
|
||||
"""
|
||||
with BlockingClient(host) as client:
|
||||
state = client.send(GetState())
|
||||
return state
|
||||
|
||||
|
||||
def load_layout_to_panda(host: str, layout: list[str]) -> None:
|
||||
"""Load a layout to the PandaBox.
|
||||
|
||||
Args:
|
||||
host (str): The hostname of the PandaBox.
|
||||
layout (list[str]): The layout to load to the PandaBox. See module docstring for more info.
|
||||
"""
|
||||
with BlockingClient(host) as client:
|
||||
client.send(SetState(layout))
|
||||
|
||||
|
||||
def save_panda_layout_to_file(host: str, file_path: str) -> None:
|
||||
"""
|
||||
Save the currently loaded layout from the PandaBox to a local file.
|
||||
|
||||
Args:
|
||||
host (str): The hostname of the PandaBox.
|
||||
file_path (str): The path to the file where the layout will be saved.
|
||||
"""
|
||||
layout = "\n".join(load_layout_from_panda(host))
|
||||
with open(file_path, "w") as file:
|
||||
file.write(layout)
|
||||
|
||||
|
||||
def load_layout_from_file_to_panda(host: str, file_path: str) -> None:
|
||||
"""
|
||||
Load a layout from a local file to the PandaBox.
|
||||
|
||||
Args:
|
||||
host (str): The hostname of the PandaBox.
|
||||
file_path (str): The path to the file from which the layout will be loaded.
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"Could not find layout for file path: {file_path}.")
|
||||
with open(file_path, "r") as f:
|
||||
layout = f.read().splitlines()
|
||||
load_layout_to_panda(host, layout)
|
||||
|
||||
|
||||
########################
|
||||
### PandaBox Device ###
|
||||
########################
|
||||
|
||||
|
||||
class PandaDataEvents(StrEnum):
|
||||
"""
|
||||
Events from the PandaBox data stream. The events READY, START, FRAME, END correspond to
|
||||
actual data frames received from the PandaBox. The DISARMED event is used to indicate
|
||||
that the PandaBox has been disarmed, either after a complete data acquisition or after
|
||||
an interrupted acquisition.
|
||||
"""
|
||||
|
||||
READY = "ready"
|
||||
START = "start"
|
||||
FRAME = "frame"
|
||||
END = "end"
|
||||
DISARMED = "disarmed"
|
||||
|
||||
|
||||
LITERAL_PANDA_DATA_EVENTS: TypeAlias = Union[
|
||||
PandaDataEvents.READY.value,
|
||||
PandaDataEvents.START.value,
|
||||
PandaDataEvents.FRAME.value,
|
||||
PandaDataEvents.END.value,
|
||||
]
|
||||
|
||||
LITERAL_PANDA_COMMANDS: TypeAlias = Union[
|
||||
Raw, Arm, Disarm, GetChanges, GetBlockInfo, GetFieldInfo, GetPcapBitsLabels
|
||||
]
|
||||
|
||||
LITERAL_PANDA_DATA: TypeAlias = Union[ReadyData, StartData, FrameData, EndData]
|
||||
|
||||
|
||||
class PandaBox(PSIDeviceBase):
|
||||
"""
|
||||
Base class for PandaBox devices. Beamline integrations should inherit from this base class,
|
||||
to integrate pre-defined PandaBox layout directly into the BEC scan interface, stage/unstage,
|
||||
trigger/complete, pre_scan or kickoff methods.
|
||||
|
||||
"""
|
||||
|
||||
USER_ACCESS = [
|
||||
"send_raw",
|
||||
"add_status_callback",
|
||||
"remove_status_callback",
|
||||
"get_panda_data_state",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
host: str,
|
||||
scan_info: "ScanInfo" | None = None,
|
||||
device_manager: "DeviceManagerDS" | None = None,
|
||||
kwargs,
|
||||
) -> None:
|
||||
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
|
||||
self.host = host
|
||||
|
||||
# Lock
|
||||
self._lock = threading.RLock()
|
||||
self._panda_data_state: PandaDataEvents | str = PandaDataEvents.DISARMED.value
|
||||
|
||||
# Status callback management
|
||||
self._status_callbacks: dict[uuid.UUID, dict[str, Any]] = {}
|
||||
|
||||
# Data callbacks management
|
||||
self._data_callbacks: dict[uuid.UUID, Callable[[LITERAL_PANDA_DATA], None]] = {}
|
||||
|
||||
# Thread to receive data from the PandaBox
|
||||
self.data_thread: threading.Thread = threading.Thread(
|
||||
target=self._data_thread_loop, daemon=True
|
||||
)
|
||||
self.data_thread_kill_event = threading.Event()
|
||||
self.data_thread_run_event = threading.Event()
|
||||
|
||||
##########################
|
||||
### Public API methods ###
|
||||
##########################
|
||||
|
||||
def send_raw(self, raw_command: Union[str, list[str]]) -> Any:
|
||||
"""
|
||||
Send a raw command to the PandaBox. This can be used to set for example
|
||||
values on PandaBox block fields directly, e.g. 'BITS.B=1' to set the BITS.B field to 1.
|
||||
|
||||
Args:
|
||||
raw_command (str | list[str]): The raw command to send to the PandaBox. We can also send
|
||||
a list of raw commands at once. This will be executed sequentially by
|
||||
the PandaBox client.
|
||||
|
||||
Returns:
|
||||
Any: The response from the PandaBox client.
|
||||
|
||||
Notes:
|
||||
Other useful raw commands:
|
||||
- 'BITS.B=1' or similar once to set bit fields
|
||||
- ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once
|
||||
- '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check
|
||||
"""
|
||||
return self._send_command(Raw(raw_command))
|
||||
|
||||
def add_status_callback(
|
||||
self,
|
||||
status: StatusBase,
|
||||
success: list[PandaDataEvents],
|
||||
failure: list[PandaDataEvents],
|
||||
check_directly: bool = True,
|
||||
) -> str:
|
||||
"""
|
||||
This methods registers a status callback to the data receiving loop that will resolve
|
||||
if the PandaBox receives specific data events. It is used to allow asynchronous resolution
|
||||
of status objects based on PandaBox events. Per default, the callback checks the current
|
||||
panda_data_state directly to see if the status can be resolved immediately. This is useful
|
||||
when the status is created after the PandaBox has already sent some data events. However, this
|
||||
can also be disabled by setting check_directly to False.
|
||||
|
||||
Args:
|
||||
status (StatusBase): The status object to register the callback for.
|
||||
success (list[PandaDataEvents]): The list of PandaBox data events that will resolve
|
||||
the status as successful.
|
||||
failure (list[PandaDataEvents]): The list of PandaBox data events that will resolve
|
||||
the status as failed.
|
||||
check_directly (bool): Whether to check the current panda_data_state directly
|
||||
to resolve the status immediately. Defaults to True.
|
||||
|
||||
Returns:
|
||||
str: The unique ID of the registered callback. This can be used to remove the callback. If the
|
||||
status is resolved directly, an empty string is returned.
|
||||
"""
|
||||
with self._lock:
|
||||
if check_directly:
|
||||
current_state = self.panda_data_state
|
||||
if current_state in success and not status.done:
|
||||
status.set_finished()
|
||||
return ""
|
||||
elif current_state in failure and not status.done:
|
||||
status.set_exception(
|
||||
RuntimeError(
|
||||
f"Status with success conditions {success} and failure conditions {failure} "
|
||||
f"due to PandaBox already being in failure state: {current_state}"
|
||||
)
|
||||
)
|
||||
return ""
|
||||
cb_id = str(uuid.uuid4())
|
||||
self._status_callbacks[cb_id] = {
|
||||
"status": status,
|
||||
"success": success,
|
||||
"failure": failure,
|
||||
}
|
||||
return cb_id
|
||||
|
||||
def remove_status_callback(self, cb_id: str) -> None:
|
||||
"""
|
||||
Remove a previously registered status callback.
|
||||
|
||||
Args:
|
||||
cb_id (str): The unique ID of the callback to remove.
|
||||
"""
|
||||
with self._lock:
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
|
||||
def add_data_callback(
|
||||
self,
|
||||
callback: Callable[[LITERAL_PANDA_DATA], None],
|
||||
data_type: LITERAL_PANDA_DATA_EVENTS = PandaDataEvents.FRAME.value,
|
||||
) -> str:
|
||||
"""
|
||||
Register a data callback to be called whenever new data is received from the PandaBox.
|
||||
|
||||
Args:
|
||||
callback (Callable[[LITERAL_PANDA_DATA], None]): The callback function to register. It should accept
|
||||
a single argument of type LITERAL_PANDA_DATA (see notes).
|
||||
data_type ("ready", "start", "frame", "end"): The type of data to register the callback for.
|
||||
Defaults to "frame".
|
||||
|
||||
Returns:
|
||||
str: The unique ID of the registered callback. This can be used to remove the callback.
|
||||
"""
|
||||
with self._lock:
|
||||
cb_id = str(uuid.uuid4())
|
||||
self._data_callbacks[cb_id] = {"callback": callback, "data_type": data_type}
|
||||
return cb_id
|
||||
|
||||
def remove_data_callback(self, cb_id: str) -> None:
|
||||
"""
|
||||
Remove a previously registered data callback.
|
||||
|
||||
Args:
|
||||
cb_id (str): The unique ID of the callback to remove.
|
||||
"""
|
||||
with self._lock:
|
||||
self._data_callbacks.pop(cb_id, None)
|
||||
|
||||
def get_panda_data_state(self) -> str:
|
||||
"""Get current panda data state."""
|
||||
return self.panda_data_state
|
||||
|
||||
#########################
|
||||
### State management ###
|
||||
#########################
|
||||
|
||||
@property
|
||||
def panda_data_state(self) -> str:
|
||||
"""Get the current state of the data acquisition on the PandaBox."""
|
||||
return (
|
||||
self._panda_data_state.value
|
||||
if isinstance(self._panda_data_state, StrEnum)
|
||||
else self._panda_data_state
|
||||
)
|
||||
|
||||
@panda_data_state.setter
|
||||
def panda_data_state(self, value: LITERAL_PANDA_COMMANDS | str) -> None:
|
||||
"""Set the current state of the data acquisition on the PandaBox."""
|
||||
with self._lock:
|
||||
self._panda_data_state = value
|
||||
|
||||
################################
|
||||
### Data readout management ###
|
||||
################################
|
||||
|
||||
def _data_thread_loop(self) -> None:
|
||||
"""
|
||||
This method runs a loop in the data_thread and handle data readouts from the PandaBox.
|
||||
The loop will be activated when the data_thread_run_event is set, and will exit when the
|
||||
data_thread_kill_event is set. Please make sure to first set the kill event, and also the
|
||||
run_event to unblock the thread such that it can exit cleanly.
|
||||
"""
|
||||
while not self.data_thread_kill_event.is_set():
|
||||
self.data_thread_run_event.wait() # Block until started
|
||||
if self.data_thread_kill_event.is_set():
|
||||
break # Break loop if kill event is set after waiting is unblocked
|
||||
self._run_data_readout()
|
||||
|
||||
def _run_data_readout(self) -> None:
|
||||
"""
|
||||
Data readoud loop. This method connects to the PandaBox with a BlockingClient,
|
||||
receiving data messages. There are 4 types of data messages:
|
||||
- ReadyData: Indicates that the PandaBox is ready for data acquisition.
|
||||
- StartData: Indicates the start of a data acquisition.
|
||||
- FrameData: Contains a frame of data acquired from the PandaBox.
|
||||
- EndData: Indicates the end of a data acquisition.
|
||||
|
||||
Upon receiving each type of data message, the panda_data_state is updated accordingly,
|
||||
and any registered callbacks for that event are executed. This allows to handle callbacks
|
||||
for each stage of the data acquisition process. For example, a child class could add a
|
||||
status callback to resolve during a specific stage of the data acquisition based on an
|
||||
event received here.
|
||||
|
||||
# NOTE: The receiving loop has to be started before the ARM() command is sent to the PandaBox.
|
||||
# The required sequence is to (1) start the data readout loop and receive ReadyData,
|
||||
# (2) send the ARM() command to the PandaBox to start acquisition, (3) receive StartData and FrameData,
|
||||
# (4) receive EndData when acquisition is complete. When an acquisition is interrupted prematurely, we have
|
||||
# to ensure that we send the DISARM() command to the PandaBox to stop the acquisition cleanly. Multiple disarm
|
||||
# commands are safe to send, so we can always ensure that we disarm at the end of the readout loop. (TODO to check).
|
||||
"""
|
||||
with BlockingClient(self.host) as client:
|
||||
try:
|
||||
for data in client.data(scaled=False):
|
||||
if isinstance(data, ReadyData):
|
||||
self._run_status_callbacks(PandaDataEvents.READY)
|
||||
self._run_data_callbacks(data, PandaDataEvents.READY.value)
|
||||
|
||||
elif isinstance(data, StartData):
|
||||
self._run_status_callbacks(PandaDataEvents.START)
|
||||
self._run_data_callbacks(data, PandaDataEvents.START.value)
|
||||
|
||||
elif isinstance(data, FrameData):
|
||||
self._run_status_callbacks(PandaDataEvents.FRAME)
|
||||
self._run_data_callbacks(data, PandaDataEvents.FRAME.value)
|
||||
|
||||
elif isinstance(data, EndData):
|
||||
self._run_status_callbacks(PandaDataEvents.END)
|
||||
self._run_data_callbacks(data, PandaDataEvents.END.value)
|
||||
break # Exit data readout loop
|
||||
|
||||
finally:
|
||||
# NOTE: This block ensures that we properly cleanup after a data acquisition,
|
||||
# whether it completed successfully or was interrupted. This includes sending
|
||||
# the DISARM() command to the PandaBox to stop any ongoing acquisition in case
|
||||
# we exited the loop prematurely. It also clears the data_thread_run_event to block
|
||||
# the data readout loop again, and runs the DISARMED status callbacks to notify
|
||||
# any registered status objects that the PandaBox is now disarmed. DISARMED is the
|
||||
# expected safe state of the data receiving loop from the PandaBox and was added
|
||||
# in addition to the existing READY, START, FRAME, END events created from the existing
|
||||
# PandaBox data messages.
|
||||
client.send(Disarm()) # Ensure we disarm at the end
|
||||
self.data_thread_run_event.clear()
|
||||
self._run_status_callbacks(PandaDataEvents.DISARMED)
|
||||
|
||||
def _run_status_callbacks(self, event: PandaDataEvents) -> None:
|
||||
"""
|
||||
Run registered status callbacks for a given PandaBox data event.
|
||||
These callbacks are used to resolve status objects that are registered
|
||||
to resolve in success/failure based on PandaBox data events. They are
|
||||
commonly used in the scan interface methods (pre_scan, kickoff, trigger or complete).
|
||||
and allow to for asynchronous resolution of these methods based on PandaBox data events.
|
||||
|
||||
NOTE : Status callbacks are removed once they are resolved (either success or failure).
|
||||
|
||||
Args:
|
||||
event (PandaDataEvents): The PandaBox data event that occurred.
|
||||
data (LITERAL_PANDA_DATA): The data associated with the event.
|
||||
"""
|
||||
self.panda_data_state = event
|
||||
with self._lock:
|
||||
callbacks_to_remove = []
|
||||
for cb_id, cb_info in self._status_callbacks.items():
|
||||
status: StatusBase = cb_info["status"]
|
||||
success_events: list[PandaDataEvents] = cb_info["success"]
|
||||
failure_events: list[PandaDataEvents] = cb_info["failure"]
|
||||
|
||||
if event in success_events and not status.done:
|
||||
status.set_finished()
|
||||
callbacks_to_remove.append(cb_id)
|
||||
elif event in failure_events and not status.done:
|
||||
status.set_exception(
|
||||
RuntimeError(
|
||||
f"Status with success conditions {success_events} and failure conditions {failure_events} "
|
||||
f"due to PandaBox receiving failure event: {event}"
|
||||
)
|
||||
)
|
||||
callbacks_to_remove.append(cb_id)
|
||||
for cb_id in callbacks_to_remove:
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
|
||||
def _run_data_callbacks(
|
||||
self, data: LITERAL_PANDA_DATA_EVENTS, event_type: LITERAL_PANDA_DATA_EVENTS
|
||||
) -> None:
|
||||
"""
|
||||
Placeholder method to run data callbacks for received PandaBox data.
|
||||
Child classes can override this method to implement custom behavior
|
||||
upon receiving different types of PandaBox data.
|
||||
NOTE: Data callbacks are not removed after being called, as they may be
|
||||
intended to be called multiple times during a data acquisition.
|
||||
|
||||
Args:
|
||||
data (LITERAL_PANDA_DATA_EVENTS): The data received from the PandaBox.
|
||||
event_type (LITERAL_PANDA_DATA_EVENTS): The type of data received. This can be
|
||||
"ready", "start", "frame", or "end".
|
||||
"""
|
||||
with self._lock:
|
||||
for cb_info in self._data_callbacks.values():
|
||||
callback: Callable[[LITERAL_PANDA_DATA_EVENTS], None] = cb_info["callback"]
|
||||
cb_data_type: LITERAL_PANDA_DATA_EVENTS = cb_info["data_type"]
|
||||
if cb_data_type == event_type:
|
||||
callback(data)
|
||||
|
||||
#############################
|
||||
### PSIDeviceBase methods ###
|
||||
#############################
|
||||
|
||||
# NOTE These are beamline hooks for the scan interface within BEC.
|
||||
# If overwritten by child classes, please make sure to either call super()
|
||||
# or re-evaluate the implemented logic as these methods attempt to partially
|
||||
# setup the PandaBox for data acquisition.
|
||||
|
||||
def on_connected(self):
|
||||
"""
|
||||
Here we start the data readout thread upon connection to the PandaBox device.
|
||||
We do this after the super().on_connected() call to ensure that any additional
|
||||
connection logic from child classes is executed first.
|
||||
"""
|
||||
# Test connection by sending WHO command which should respond with PandaBox ID
|
||||
try:
|
||||
ret = self.send_raw("*IDN?")
|
||||
except Exception as e:
|
||||
logger.error(f"Could not connect to PandaBox {self.name} at host {self.host}: {e}")
|
||||
raise e from e
|
||||
super().on_connected()
|
||||
self.data_thread.start()
|
||||
|
||||
def stop(self, *, success=False):
|
||||
"""
|
||||
Stopping the PandaBox device should ensure that the PandaBox is disarmed.
|
||||
We call this prior to the super().stop() call to ensure that the PandaBox
|
||||
is disarmed before any additional stopping logic from child classes is executed.
|
||||
"""
|
||||
super().stop(success=success)
|
||||
self._disarm()
|
||||
|
||||
def destroy(self):
|
||||
"""
|
||||
We append the cleanup of the data readout thread to the destroy method,
|
||||
and call it prior to the super().destroy() call.
|
||||
This ensures that the data readout thread is properly cleaned up
|
||||
when the PandaBox device is destroyed.
|
||||
"""
|
||||
self.data_thread_kill_event.set() # Signal thread to exit
|
||||
self.data_thread_run_event.set() # Unblock thread if waiting
|
||||
super().destroy()
|
||||
|
||||
def stage(self) -> list[object] | StatusBase:
|
||||
"""
|
||||
Stage the PandaBox device for acquisition.
|
||||
We make sure that the PandaBox data state is DISARMED before staging,
|
||||
to ensure that we do not start an acquisition while the PandaBox is still running an acquisition.
|
||||
This should never hapen in a well-integrated scan interface, but we add this check
|
||||
to be safe.
|
||||
|
||||
Then we call super().stage() to perform any additional staging logic from child classes.
|
||||
Finally, we start the data readout loop by setting the data_thread_run_event.
|
||||
|
||||
Returns:
|
||||
list[object] | StatusBase: The result of the super().stage() call.
|
||||
"""
|
||||
# First make sure that the data readout loop is not running
|
||||
status = StatusBase(obj=self)
|
||||
self.add_status_callback(status=status, success=[PandaDataEvents.DISARMED], failure=[])
|
||||
try:
|
||||
status.wait(timeout=3)
|
||||
except WaitTimeoutError:
|
||||
logger.error(f"PandaBox {self.name} did not disarm before staging.")
|
||||
# pylint: disable=raise-from-missing
|
||||
raise RuntimeError(
|
||||
f"PandaBox {self.name} did not disarm properly. Please connection and the device integration."
|
||||
)
|
||||
|
||||
ret = super().stage()
|
||||
self.data_thread_run_event.set() # Start data readout loop
|
||||
return ret
|
||||
|
||||
def pre_scan(self) -> StatusBase:
|
||||
"""
|
||||
Prepare the PandaBox device for scan acquisition. It is important here that
|
||||
the PCAP module is armed for acquisition only after the data readout loop is started.
|
||||
We therefore add the logic here after any additional pre_scan logic from child classes.
|
||||
"""
|
||||
status = super().pre_scan()
|
||||
status_ready_data_received = StatusBase(obj=self)
|
||||
self.add_status_callback(
|
||||
status=status_ready_data_received,
|
||||
success=[PandaDataEvents.READY],
|
||||
failure=[PandaDataEvents.FRAME, PandaDataEvents.END],
|
||||
)
|
||||
status_ready_data_received.add_callback(self._pre_scan_status_callback)
|
||||
if status:
|
||||
ret_status = status_ready_data_received & status
|
||||
else:
|
||||
ret_status = status_ready_data_received
|
||||
self.cancel_on_stop(ret_status)
|
||||
return ret_status
|
||||
|
||||
def unstage(self) -> list[object] | StatusBase:
|
||||
"""
|
||||
Any unstaging of the PandaBox device should ensure that"""
|
||||
ret = super().unstage()
|
||||
self.data_thread_run_event.clear() # Make sure that the data readout loop is stopped
|
||||
self._disarm() # Disarm the PandaBox, should be idempotent (TODO to check)
|
||||
return ret
|
||||
|
||||
#######################
|
||||
### Utility Methods ###
|
||||
#######################
|
||||
|
||||
def _get_signal_names_allowed_for_capture(self) -> list[str]:
|
||||
"""Utility method to get a list of all signal keys that CAN BE CONFIGURED for capture on the PandaBox."""
|
||||
ret = self.send_command(Raw("*CAPTURE.*?"))
|
||||
# TODO check proper unpacking of returned keys
|
||||
return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")]
|
||||
|
||||
def _get_signal_names_configured_for_capture(self) -> list[str]:
|
||||
"""Utility method to get a list of all signal keys thar ARE CURRENTLY CONFIGURED for capture on the PandaBox."""
|
||||
ret = self.send_command(Raw("*CAPTURE?"))
|
||||
return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")]
|
||||
|
||||
def _compile_frame_data_to_dict(self, frame_data: FrameData) -> dict[str, Any]:
|
||||
"""
|
||||
Compile the data from a FrameData object into a dictionary with expected OPHYD
|
||||
read format, e.g. signal {signal_name: {"value": [...]}}.
|
||||
|
||||
Args:
|
||||
frame_data (FrameData): The FrameData object received from the PandaBox.
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: The compiled data in OPHYD read format.
|
||||
"""
|
||||
out = defaultdict(list)
|
||||
data = frame_data.data
|
||||
keys = data.dtype.names
|
||||
for entry in data:
|
||||
for i, key in enumerate(keys):
|
||||
out[key]["value"].append(entry[i])
|
||||
|
||||
def _pre_scan_status_callback(self, status: StatusBase):
|
||||
"""
|
||||
Callback for arming the PCAP module during pre_scan.
|
||||
|
||||
Args:
|
||||
status (StatusBase): The status object to resolve when arming is complete.
|
||||
"""
|
||||
if not status.done:
|
||||
self._arm()
|
||||
status.set_finished()
|
||||
|
||||
def _send_command(self, command: LITERAL_PANDA_COMMANDS) -> Any:
|
||||
"""Send a command to the PandaBox via the BlockingClient."""
|
||||
with BlockingClient(self.host) as client:
|
||||
response = client.send(command)
|
||||
return response
|
||||
|
||||
def _arm(self) -> None:
|
||||
"""Arm the PandaBox device."""
|
||||
self._send_command(Arm())
|
||||
|
||||
def _disarm(self) -> None:
|
||||
"""Disarm the PandaBox device."""
|
||||
self._send_command(Disarm())
|
||||
54
ophyd_devices/devices/pandabox/utility_scripts.py
Normal file
54
ophyd_devices/devices/pandabox/utility_scripts.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Module with utility scripts to run on the PandaBox device.
|
||||
|
||||
- Save the PandaBox layout to a local file on disk.
|
||||
Example usage: python ./utility_scripts.py --host panda-box-host.psi.ch --save-layout ./my_layout.ini
|
||||
- Load a PandaBox layout from a local file on disk.
|
||||
Example usage: python ./utility_scripts.py --host panda-box-host.psi.ch --load-layout ./my_layout.ini
|
||||
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from ophyd_devices.devices.pandabox.pandabox import (
|
||||
load_layout_from_file_to_panda,
|
||||
save_panda_layout_to_file,
|
||||
)
|
||||
|
||||
|
||||
def build_argparser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Load or save a PandaBox layout")
|
||||
|
||||
parser.add_argument("--host", type=str, required=True, help="Hostname of the PandaBox")
|
||||
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
|
||||
group.add_argument(
|
||||
"--save-layout", type=Path, metavar="FILE", help="Save current PandaBox layout to FILE"
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--load-layout", type=Path, metavar="FILE", help="Load PandaBox layout from FILE"
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for the utility script."""
|
||||
parser = build_argparser()
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.save_layout is not None:
|
||||
save_panda_layout_to_file(host=args.host, file_path=args.save_layout)
|
||||
|
||||
elif args.load_layout is not None:
|
||||
load_layout_from_file_to_panda(host=args.host, file_path=args.load_layout)
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -25,6 +25,7 @@ dependencies = [
|
||||
"pytest ~= 8.0",
|
||||
"h5py ~= 3.10",
|
||||
"hdf5plugin >=4.3, < 6.0",
|
||||
"pandablocks ~= 0.10",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
||||
Reference in New Issue
Block a user