diff --git a/ophyd_devices/devices/pandabox/pandabox.py b/ophyd_devices/devices/pandabox/pandabox.py index 3fa1872..a2786d3 100644 --- a/ophyd_devices/devices/pandabox/pandabox.py +++ b/ophyd_devices/devices/pandabox/pandabox.py @@ -28,20 +28,10 @@ from collections import defaultdict from enum import StrEnum from typing import TYPE_CHECKING, Any, Callable, TypeAlias, Union +import pandablocks.commands as pbc from bec_lib import bec_logger from ophyd.status import WaitTimeoutError from pandablocks.blocking import BlockingClient -from pandablocks.commands import ( - Arm, - Disarm, - GetBlockInfo, - GetChanges, - GetFieldInfo, - GetPcapBitsLabels, - GetState, - Raw, - SetState, -) from pandablocks.responses import EndData, FrameData, ReadyData, StartData from ophyd_devices import PSIDeviceBase, StatusBase @@ -70,7 +60,7 @@ def load_layout_from_panda(host: str) -> list[str]: """ with BlockingClient(host) as client: - state = client.send(GetState()) + state = client.send(pbc.GetState()) return state @@ -82,7 +72,7 @@ def load_layout_to_panda(host: str, layout: list[str]) -> None: layout (list[str]): The layout to load to the PandaBox. See module docstring for more info. """ with BlockingClient(host) as client: - client.send(SetState(layout)) + client.send(pbc.SetState(layout)) def save_panda_layout_to_file(host: str, file_path: str) -> None: @@ -141,7 +131,13 @@ LITERAL_PANDA_DATA_EVENTS: TypeAlias = Union[ ] LITERAL_PANDA_COMMANDS: TypeAlias = Union[ - Raw, Arm, Disarm, GetChanges, GetBlockInfo, GetFieldInfo, GetPcapBitsLabels + pbc.Raw, + pbc.Arm, + pbc.Disarm, + pbc.GetChanges, + pbc.GetBlockInfo, + pbc.GetFieldInfo, + pbc.GetPcapBitsLabels, ] LITERAL_PANDA_DATA: TypeAlias = Union[ReadyData, StartData, FrameData, EndData] @@ -214,7 +210,7 @@ class PandaBox(PSIDeviceBase): - ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once - '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check """ - return self._send_command(Raw(raw_command)) + return self._send_command(pbc.Raw(raw_command)) def add_status_callback( self, @@ -321,12 +317,12 @@ class PandaBox(PSIDeviceBase): """Get the current state of the data acquisition on the PandaBox.""" return ( self._panda_data_state.value - if isinstance(self._panda_data_state, StrEnum) + if isinstance(self._panda_data_state, PandaDataEvents) else self._panda_data_state ) @panda_data_state.setter - def panda_data_state(self, value: LITERAL_PANDA_COMMANDS | str) -> None: + def panda_data_state(self, value: PandaDataEvents | str) -> None: """Set the current state of the data acquisition on the PandaBox.""" with self._lock: self._panda_data_state = value @@ -400,7 +396,7 @@ class PandaBox(PSIDeviceBase): # expected safe state of the data receiving loop from the PandaBox and was added # in addition to the existing READY, START, FRAME, END events created from the existing # PandaBox data messages. - client.send(Disarm()) # Ensure we disarm at the end + client.send(self._disarm()) # Ensure we disarm at the end self.data_thread_run_event.clear() self._run_status_callbacks(PandaDataEvents.DISARMED) @@ -441,7 +437,7 @@ class PandaBox(PSIDeviceBase): self._status_callbacks.pop(cb_id, None) def _run_data_callbacks( - self, data: LITERAL_PANDA_DATA_EVENTS, event_type: LITERAL_PANDA_DATA_EVENTS + self, data: LITERAL_PANDA_DATA, event_type: LITERAL_PANDA_DATA_EVENTS ) -> None: """ Placeholder method to run data callbacks for received PandaBox data. @@ -451,13 +447,13 @@ class PandaBox(PSIDeviceBase): intended to be called multiple times during a data acquisition. Args: - data (LITERAL_PANDA_DATA_EVENTS): The data received from the PandaBox. + data (LITERAL_PANDA_DATA): The data received from the PandaBox. event_type (LITERAL_PANDA_DATA_EVENTS): The type of data received. This can be "ready", "start", "frame", or "end". """ with self._lock: for cb_info in self._data_callbacks.values(): - callback: Callable[[LITERAL_PANDA_DATA_EVENTS], None] = cb_info["callback"] + callback: Callable[[LITERAL_PANDA_DATA], None] = cb_info["callback"] cb_data_type: LITERAL_PANDA_DATA_EVENTS = cb_info["data_type"] if cb_data_type == event_type: callback(data) @@ -571,13 +567,13 @@ class PandaBox(PSIDeviceBase): def _get_signal_names_allowed_for_capture(self) -> list[str]: """Utility method to get a list of all signal keys that CAN BE CONFIGURED for capture on the PandaBox.""" - ret = self.send_command(Raw("*CAPTURE.*?")) + ret = self.send_command(self.send_raw("*CAPTURE.*?")) # TODO check proper unpacking of returned keys return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")] def _get_signal_names_configured_for_capture(self) -> list[str]: """Utility method to get a list of all signal keys thar ARE CURRENTLY CONFIGURED for capture on the PandaBox.""" - ret = self.send_command(Raw("*CAPTURE?")) + ret = self.send_command(self.send_raw("*CAPTURE?")) return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")] def _compile_frame_data_to_dict(self, frame_data: FrameData) -> dict[str, Any]: @@ -617,8 +613,8 @@ class PandaBox(PSIDeviceBase): def _arm(self) -> None: """Arm the PandaBox device.""" - self._send_command(Arm()) + self._send_command(pbc.Arm()) def _disarm(self) -> None: """Disarm the PandaBox device.""" - self._send_command(Disarm()) + self._send_command(pbc.Disarm())