mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-02-20 09:18:42 +01:00
refactor(pandabox): fix typehints and usage of pandablock.commands
This commit is contained in:
@@ -28,20 +28,10 @@ from collections import defaultdict
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, Any, Callable, TypeAlias, Union
|
||||
|
||||
import pandablocks.commands as pbc
|
||||
from bec_lib import bec_logger
|
||||
from ophyd.status import WaitTimeoutError
|
||||
from pandablocks.blocking import BlockingClient
|
||||
from pandablocks.commands import (
|
||||
Arm,
|
||||
Disarm,
|
||||
GetBlockInfo,
|
||||
GetChanges,
|
||||
GetFieldInfo,
|
||||
GetPcapBitsLabels,
|
||||
GetState,
|
||||
Raw,
|
||||
SetState,
|
||||
)
|
||||
from pandablocks.responses import EndData, FrameData, ReadyData, StartData
|
||||
|
||||
from ophyd_devices import PSIDeviceBase, StatusBase
|
||||
@@ -70,7 +60,7 @@ def load_layout_from_panda(host: str) -> list[str]:
|
||||
|
||||
"""
|
||||
with BlockingClient(host) as client:
|
||||
state = client.send(GetState())
|
||||
state = client.send(pbc.GetState())
|
||||
return state
|
||||
|
||||
|
||||
@@ -82,7 +72,7 @@ def load_layout_to_panda(host: str, layout: list[str]) -> None:
|
||||
layout (list[str]): The layout to load to the PandaBox. See module docstring for more info.
|
||||
"""
|
||||
with BlockingClient(host) as client:
|
||||
client.send(SetState(layout))
|
||||
client.send(pbc.SetState(layout))
|
||||
|
||||
|
||||
def save_panda_layout_to_file(host: str, file_path: str) -> None:
|
||||
@@ -141,7 +131,13 @@ LITERAL_PANDA_DATA_EVENTS: TypeAlias = Union[
|
||||
]
|
||||
|
||||
LITERAL_PANDA_COMMANDS: TypeAlias = Union[
|
||||
Raw, Arm, Disarm, GetChanges, GetBlockInfo, GetFieldInfo, GetPcapBitsLabels
|
||||
pbc.Raw,
|
||||
pbc.Arm,
|
||||
pbc.Disarm,
|
||||
pbc.GetChanges,
|
||||
pbc.GetBlockInfo,
|
||||
pbc.GetFieldInfo,
|
||||
pbc.GetPcapBitsLabels,
|
||||
]
|
||||
|
||||
LITERAL_PANDA_DATA: TypeAlias = Union[ReadyData, StartData, FrameData, EndData]
|
||||
@@ -214,7 +210,7 @@ class PandaBox(PSIDeviceBase):
|
||||
- ['PULSE1.DELAY.UNITS=s', PULSE1.DELAY=0, PULSE1.WIDTH.UNITS=s, PULSE1.WIDTH=0.001] to set multiple fields at once
|
||||
- '*CAPTURE?' to inspect which signals have been configured for capture (PCAP?) TODO to check
|
||||
"""
|
||||
return self._send_command(Raw(raw_command))
|
||||
return self._send_command(pbc.Raw(raw_command))
|
||||
|
||||
def add_status_callback(
|
||||
self,
|
||||
@@ -321,12 +317,12 @@ class PandaBox(PSIDeviceBase):
|
||||
"""Get the current state of the data acquisition on the PandaBox."""
|
||||
return (
|
||||
self._panda_data_state.value
|
||||
if isinstance(self._panda_data_state, StrEnum)
|
||||
if isinstance(self._panda_data_state, PandaDataEvents)
|
||||
else self._panda_data_state
|
||||
)
|
||||
|
||||
@panda_data_state.setter
|
||||
def panda_data_state(self, value: LITERAL_PANDA_COMMANDS | str) -> None:
|
||||
def panda_data_state(self, value: PandaDataEvents | str) -> None:
|
||||
"""Set the current state of the data acquisition on the PandaBox."""
|
||||
with self._lock:
|
||||
self._panda_data_state = value
|
||||
@@ -400,7 +396,7 @@ class PandaBox(PSIDeviceBase):
|
||||
# expected safe state of the data receiving loop from the PandaBox and was added
|
||||
# in addition to the existing READY, START, FRAME, END events created from the existing
|
||||
# PandaBox data messages.
|
||||
client.send(Disarm()) # Ensure we disarm at the end
|
||||
client.send(self._disarm()) # Ensure we disarm at the end
|
||||
self.data_thread_run_event.clear()
|
||||
self._run_status_callbacks(PandaDataEvents.DISARMED)
|
||||
|
||||
@@ -441,7 +437,7 @@ class PandaBox(PSIDeviceBase):
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
|
||||
def _run_data_callbacks(
|
||||
self, data: LITERAL_PANDA_DATA_EVENTS, event_type: LITERAL_PANDA_DATA_EVENTS
|
||||
self, data: LITERAL_PANDA_DATA, event_type: LITERAL_PANDA_DATA_EVENTS
|
||||
) -> None:
|
||||
"""
|
||||
Placeholder method to run data callbacks for received PandaBox data.
|
||||
@@ -451,13 +447,13 @@ class PandaBox(PSIDeviceBase):
|
||||
intended to be called multiple times during a data acquisition.
|
||||
|
||||
Args:
|
||||
data (LITERAL_PANDA_DATA_EVENTS): The data received from the PandaBox.
|
||||
data (LITERAL_PANDA_DATA): The data received from the PandaBox.
|
||||
event_type (LITERAL_PANDA_DATA_EVENTS): The type of data received. This can be
|
||||
"ready", "start", "frame", or "end".
|
||||
"""
|
||||
with self._lock:
|
||||
for cb_info in self._data_callbacks.values():
|
||||
callback: Callable[[LITERAL_PANDA_DATA_EVENTS], None] = cb_info["callback"]
|
||||
callback: Callable[[LITERAL_PANDA_DATA], None] = cb_info["callback"]
|
||||
cb_data_type: LITERAL_PANDA_DATA_EVENTS = cb_info["data_type"]
|
||||
if cb_data_type == event_type:
|
||||
callback(data)
|
||||
@@ -571,13 +567,13 @@ class PandaBox(PSIDeviceBase):
|
||||
|
||||
def _get_signal_names_allowed_for_capture(self) -> list[str]:
|
||||
"""Utility method to get a list of all signal keys that CAN BE CONFIGURED for capture on the PandaBox."""
|
||||
ret = self.send_command(Raw("*CAPTURE.*?"))
|
||||
ret = self.send_command(self.send_raw("*CAPTURE.*?"))
|
||||
# TODO check proper unpacking of returned keys
|
||||
return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")]
|
||||
|
||||
def _get_signal_names_configured_for_capture(self) -> list[str]:
|
||||
"""Utility method to get a list of all signal keys thar ARE CURRENTLY CONFIGURED for capture on the PandaBox."""
|
||||
ret = self.send_command(Raw("*CAPTURE?"))
|
||||
ret = self.send_command(self.send_raw("*CAPTURE?"))
|
||||
return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")]
|
||||
|
||||
def _compile_frame_data_to_dict(self, frame_data: FrameData) -> dict[str, Any]:
|
||||
@@ -617,8 +613,8 @@ class PandaBox(PSIDeviceBase):
|
||||
|
||||
def _arm(self) -> None:
|
||||
"""Arm the PandaBox device."""
|
||||
self._send_command(Arm())
|
||||
self._send_command(pbc.Arm())
|
||||
|
||||
def _disarm(self) -> None:
|
||||
"""Disarm the PandaBox device."""
|
||||
self._send_command(Disarm())
|
||||
self._send_command(pbc.Disarm())
|
||||
|
||||
Reference in New Issue
Block a user