refactor(pandabox): fix typehints and usage of pandablock.commands

This commit is contained in:
2026-01-13 11:33:52 +01:00
parent 9b0ec3ac17
commit b53de4ee3d

View File

@@ -28,20 +28,10 @@ from collections import defaultdict
from enum import StrEnum from enum import StrEnum
from typing import TYPE_CHECKING, Any, Callable, TypeAlias, Union from typing import TYPE_CHECKING, Any, Callable, TypeAlias, Union
import pandablocks.commands as pbc
from bec_lib import bec_logger from bec_lib import bec_logger
from ophyd.status import WaitTimeoutError from ophyd.status import WaitTimeoutError
from pandablocks.blocking import BlockingClient from pandablocks.blocking import BlockingClient
from pandablocks.commands import (
Arm,
Disarm,
GetBlockInfo,
GetChanges,
GetFieldInfo,
GetPcapBitsLabels,
GetState,
Raw,
SetState,
)
from pandablocks.responses import EndData, FrameData, ReadyData, StartData from pandablocks.responses import EndData, FrameData, ReadyData, StartData
from ophyd_devices import PSIDeviceBase, StatusBase from ophyd_devices import PSIDeviceBase, StatusBase
@@ -70,7 +60,7 @@ def load_layout_from_panda(host: str) -> list[str]:
""" """
with BlockingClient(host) as client: with BlockingClient(host) as client:
state = client.send(GetState()) state = client.send(pbc.GetState())
return state return state
@@ -82,7 +72,7 @@ def load_layout_to_panda(host: str, layout: list[str]) -> None:
layout (list[str]): The layout to load to the PandaBox. See module docstring for more info. layout (list[str]): The layout to load to the PandaBox. See module docstring for more info.
""" """
with BlockingClient(host) as client: with BlockingClient(host) as client:
client.send(SetState(layout)) client.send(pbc.SetState(layout))
def save_panda_layout_to_file(host: str, file_path: str) -> None: def save_panda_layout_to_file(host: str, file_path: str) -> None:
@@ -141,7 +131,13 @@ LITERAL_PANDA_DATA_EVENTS: TypeAlias = Union[
] ]
LITERAL_PANDA_COMMANDS: TypeAlias = Union[ LITERAL_PANDA_COMMANDS: TypeAlias = Union[
Raw, Arm, Disarm, GetChanges, GetBlockInfo, GetFieldInfo, GetPcapBitsLabels pbc.Raw,
pbc.Arm,
pbc.Disarm,
pbc.GetChanges,
pbc.GetBlockInfo,
pbc.GetFieldInfo,
pbc.GetPcapBitsLabels,
] ]
LITERAL_PANDA_DATA: TypeAlias = Union[ReadyData, StartData, FrameData, EndData] LITERAL_PANDA_DATA: TypeAlias = Union[ReadyData, StartData, FrameData, EndData]
@@ -214,7 +210,7 @@ class PandaBox(PSIDeviceBase):
- ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once - ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once
- '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check - '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check
""" """
return self._send_command(Raw(raw_command)) return self._send_command(pbc.Raw(raw_command))
def add_status_callback( def add_status_callback(
self, self,
@@ -321,12 +317,12 @@ class PandaBox(PSIDeviceBase):
"""Get the current state of the data acquisition on the PandaBox.""" """Get the current state of the data acquisition on the PandaBox."""
return ( return (
self._panda_data_state.value self._panda_data_state.value
if isinstance(self._panda_data_state, StrEnum) if isinstance(self._panda_data_state, PandaDataEvents)
else self._panda_data_state else self._panda_data_state
) )
@panda_data_state.setter @panda_data_state.setter
def panda_data_state(self, value: LITERAL_PANDA_COMMANDS | str) -> None: def panda_data_state(self, value: PandaDataEvents | str) -> None:
"""Set the current state of the data acquisition on the PandaBox.""" """Set the current state of the data acquisition on the PandaBox."""
with self._lock: with self._lock:
self._panda_data_state = value self._panda_data_state = value
@@ -400,7 +396,7 @@ class PandaBox(PSIDeviceBase):
# expected safe state of the data receiving loop from the PandaBox and was added # expected safe state of the data receiving loop from the PandaBox and was added
# in addition to the existing READY, START, FRAME, END events created from the existing # in addition to the existing READY, START, FRAME, END events created from the existing
# PandaBox data messages. # PandaBox data messages.
client.send(Disarm()) # Ensure we disarm at the end client.send(self._disarm()) # Ensure we disarm at the end
self.data_thread_run_event.clear() self.data_thread_run_event.clear()
self._run_status_callbacks(PandaDataEvents.DISARMED) self._run_status_callbacks(PandaDataEvents.DISARMED)
@@ -441,7 +437,7 @@ class PandaBox(PSIDeviceBase):
self._status_callbacks.pop(cb_id, None) self._status_callbacks.pop(cb_id, None)
def _run_data_callbacks( def _run_data_callbacks(
self, data: LITERAL_PANDA_DATA_EVENTS, event_type: LITERAL_PANDA_DATA_EVENTS self, data: LITERAL_PANDA_DATA, event_type: LITERAL_PANDA_DATA_EVENTS
) -> None: ) -> None:
""" """
Placeholder method to run data callbacks for received PandaBox data. Placeholder method to run data callbacks for received PandaBox data.
@@ -451,13 +447,13 @@ class PandaBox(PSIDeviceBase):
intended to be called multiple times during a data acquisition. intended to be called multiple times during a data acquisition.
Args: Args:
data (LITERAL_PANDA_DATA_EVENTS): The data received from the PandaBox. data (LITERAL_PANDA_DATA): The data received from the PandaBox.
event_type (LITERAL_PANDA_DATA_EVENTS): The type of data received. This can be event_type (LITERAL_PANDA_DATA_EVENTS): The type of data received. This can be
"ready", "start", "frame", or "end". "ready", "start", "frame", or "end".
""" """
with self._lock: with self._lock:
for cb_info in self._data_callbacks.values(): for cb_info in self._data_callbacks.values():
callback: Callable[[LITERAL_PANDA_DATA_EVENTS], None] = cb_info["callback"] callback: Callable[[LITERAL_PANDA_DATA], None] = cb_info["callback"]
cb_data_type: LITERAL_PANDA_DATA_EVENTS = cb_info["data_type"] cb_data_type: LITERAL_PANDA_DATA_EVENTS = cb_info["data_type"]
if cb_data_type == event_type: if cb_data_type == event_type:
callback(data) callback(data)
@@ -571,13 +567,13 @@ class PandaBox(PSIDeviceBase):
def _get_signal_names_allowed_for_capture(self) -> list[str]: def _get_signal_names_allowed_for_capture(self) -> list[str]:
"""Utility method to get a list of all signal keys that CAN BE CONFIGURED for capture on the PandaBox.""" """Utility method to get a list of all signal keys that CAN BE CONFIGURED for capture on the PandaBox."""
ret = self.send_command(Raw("*CAPTURE.*?")) ret = self.send_command(self.send_raw("*CAPTURE.*?"))
# TODO check proper unpacking of returned keys # TODO check proper unpacking of returned keys
return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")] return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")]
def _get_signal_names_configured_for_capture(self) -> list[str]: def _get_signal_names_configured_for_capture(self) -> list[str]:
"""Utility method to get a list of all signal keys thar ARE CURRENTLY CONFIGURED for capture on the PandaBox.""" """Utility method to get a list of all signal keys thar ARE CURRENTLY CONFIGURED for capture on the PandaBox."""
ret = self.send_command(Raw("*CAPTURE?")) ret = self.send_command(self.send_raw("*CAPTURE?"))
return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")] return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")]
def _compile_frame_data_to_dict(self, frame_data: FrameData) -> dict[str, Any]: def _compile_frame_data_to_dict(self, frame_data: FrameData) -> dict[str, Any]:
@@ -617,8 +613,8 @@ class PandaBox(PSIDeviceBase):
def _arm(self) -> None: def _arm(self) -> None:
"""Arm the PandaBox device.""" """Arm the PandaBox device."""
self._send_command(Arm()) self._send_command(pbc.Arm())
def _disarm(self) -> None: def _disarm(self) -> None:
"""Disarm the PandaBox device.""" """Disarm the PandaBox device."""
self._send_command(Disarm()) self._send_command(pbc.Disarm())