diff --git a/ophyd_devices/devices/panda_box/panda_box.py b/ophyd_devices/devices/panda_box/panda_box.py index 93ba8e9..db80198 100644 --- a/ophyd_devices/devices/panda_box/panda_box.py +++ b/ophyd_devices/devices/panda_box/panda_box.py @@ -34,6 +34,7 @@ from __future__ import annotations import os import threading +import time import uuid from enum import StrEnum from typing import TYPE_CHECKING, Any, Callable, TypeAlias, TypedDict @@ -41,7 +42,6 @@ from typing import TYPE_CHECKING, Any, Callable, TypeAlias, TypedDict import pandablocks.commands as pbc from bec_lib import bec_logger from ophyd import Component as Cpt -from ophyd import Staged from ophyd.status import WaitTimeoutError from pandablocks.blocking import BlockingClient from pandablocks.responses import Data, EndData, FrameData, ReadyData, StartData @@ -227,6 +227,9 @@ class PandaBox(PSIDeviceBase): # Acquisition group of the PandaBox data. self._acquisition_group = "panda" + # Timeouts for wait operations in seconds + self._stage_timeout_in_s = 3 + def on_init(self): """Initialize the PandaBox device. This method can be used to perform any additional initialization logic.""" super().on_init() @@ -528,7 +531,7 @@ class PandaBox(PSIDeviceBase): # or re-evaluate the implemented logic as these methods attempt to partially # setup the PandaBox for data acquisition. - def wait_for_connection(self, timouet: float | None = None) -> bool: + def wait_for_connection(self, timeout: float | None = None) -> bool: ret = self.send_raw("*IDN?") return True @@ -576,7 +579,7 @@ class PandaBox(PSIDeviceBase): status = StatusBase(obj=self) self.add_status_callback(status=status, success=[PandaState.DISARMED], failure=[]) try: - status.wait(timeout=3) + status.wait(timeout=self._stage_timeout_in_s) except WaitTimeoutError: logger.error(f"PandaBox {self.name} did not disarm before staging.") # pylint: disable=raise-from-missing @@ -619,7 +622,13 @@ class PandaBox(PSIDeviceBase): def _get_signal_names_configured_for_capture(self) -> list[str]: """Utility method to get a list of all signal keys thar ARE CURRENTLY CONFIGURED for capture on the PandaBox.""" ret = self.send_raw("*CAPTURE?") - return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")] + signal_names = [] + for value in ret: + if value.strip("."): # Ignore empty values "." + string_parts = value.strip("!").split(" ") + base_name = string_parts[0] # Get base name without capture config + _ = [signal_names.append(f"{base_name}.{key}") for key in string_parts[1:]] + return signal_names def convert_frame_data(self, frame_data: FrameData) -> dict[str, Any]: """ @@ -640,7 +649,7 @@ class PandaBox(PSIDeviceBase): mapped_key = [self.signal_alias.get(key, key) for key in keys] # Initialize lists for each key, consider adjusting names to match for k in mapped_key: - out[k] = {"value": []} # Timestamp? + out[k] = {"value": [], "timestamp": time.time()} for entry in data: for i, k in enumerate(mapped_key): out[k]["value"].append(entry[i]) # Fill values from data diff --git a/ophyd_devices/devices/panda_box/utility_scripts.py b/ophyd_devices/devices/panda_box/utility_scripts.py index 58a5f40..26df29e 100644 --- a/ophyd_devices/devices/panda_box/utility_scripts.py +++ b/ophyd_devices/devices/panda_box/utility_scripts.py @@ -1,3 +1,4 @@ +# skip-file """ Module with utility scripts to run on the PandaBox device. diff --git a/tests/test_panda.py b/tests/test_panda.py new file mode 100644 index 0000000..5354a67 --- /dev/null +++ b/tests/test_panda.py @@ -0,0 +1,282 @@ +# skip-file +from unittest import mock + +import numpy as np +import pytest +from ophyd import Staged + +from ophyd_devices import StatusBase +from ophyd_devices.devices.panda_box.panda_box import FrameData, PandaBox, PandaState +from ophyd_devices.devices.panda_box.utils import ( + PANDA_AVAIL_PCAP_BLOCKS, + PANDA_AVAIL_PCAP_CAPTURE_FIELDS, + get_pcap_capture_fields, +) + + +@pytest.fixture +def _signal_aliases(): + return {"FMC_IN.VAL1.Value": "my_signal_1", "FMC_IN.VAL2.Mean": "my_signal_2"} + + +@pytest.fixture +def panda_box(_signal_aliases): + return PandaBox(name="panda_box", host="localhost", signal_alias=_signal_aliases) + + +def test_panda_box_init(panda_box, _signal_aliases): + """Test initialization of PandaBox, including default signal aliases.""" + assert panda_box.name == "panda_box" + assert panda_box.host == "localhost" + all_signal_names = [name for name, _ in panda_box.data.signals] + for block in PANDA_AVAIL_PCAP_BLOCKS: + for field in PANDA_AVAIL_PCAP_CAPTURE_FIELDS: + signal_name = f"{block}.{field}" + if signal_name in ("FMC_IN.VAL1.Value", "FMC_IN.VAL2.Mean"): + # These signals should be renamed + assert _signal_aliases[signal_name] in all_signal_names + continue + assert signal_name in all_signal_names, f"Missing signal: {signal_name}" + + +def test_panda_wait_for_connection(panda_box): + """Test that wait_for_connection can be called without error.""" + with mock.patch.object(panda_box, "send_raw") as mock_send_raw: + mock_send_raw.return_value = "OK" + panda_box.wait_for_connection(timeout=1) + mock_send_raw.assert_called_with("*IDN?") + + +def test_panda_on_connected(panda_box): + """Test that on_connected sets the connected flag.""" + with mock.patch.object(panda_box, "data_thread") as mock_data_thread: + panda_box.on_connected() + mock_data_thread.start.assert_called_once() + assert len(panda_box._data_callbacks) == 1 + cb_id = list(panda_box._data_callbacks.keys())[0] + assert panda_box._data_callbacks[cb_id]["callback"] == panda_box._receive_frame_data + assert panda_box._data_callbacks[cb_id]["data_type"] == PandaState.FRAME + + # Remove callback + panda_box.remove_data_callback(cb_id) + assert len(panda_box._data_callbacks) == 0, "Data callback was not removed" + + +def test_panda_add_status_callback(panda_box): + """Test that add_status_callback adds proper status callbacks, and resolves them correctly.""" + + assert panda_box.panda_state == PandaState.DISARMED, "Initial PandaBox state should be DISARMED" + status = StatusBase(obj=panda_box) + # I. Resolve immediately, should be successful + panda_box.add_status_callback(status=status, success=PandaState.DISARMED, failure=[]) + status.wait(timeout=1) + assert status.done, "Status should be done" + assert status.success, "Status should be successful" + assert len(panda_box._status_callbacks) == 0, "Status callback should never be added" + + # II. Resolve immediately, but with failure state, should be unsuccessful + status = StatusBase(obj=panda_box) + panda_box.add_status_callback(status=status, success=[], failure=PandaState.DISARMED) + with pytest.raises(RuntimeError): + status.wait(timeout=1) + assert status.done, "Status should be done" + assert not status.success, "Status should be unsuccessful" + assert len(panda_box._status_callbacks) == 0, "Status callback should never be added" + + # III. Resolve status in success + status = StatusBase(obj=panda_box) + panda_box.add_status_callback(status=status, success=PandaState.READY, failure=[]) + assert len(panda_box._status_callbacks) == 1, "Status callback should be added" + panda_box._run_status_callbacks(PandaState.START) + assert not status.done, "Status should not be done" + assert not status.success, "Status should not be successful" + panda_box._run_status_callbacks(PandaState.READY) + status.wait(timeout=1) + assert status.done, "Status should be done" + assert status.success, "Status should be successful" + + # IV. Resolve status in failure + status = StatusBase(obj=panda_box) + panda_box.add_status_callback(status=status, success=[PandaState.END], failure=PandaState.START) + panda_box._run_status_callbacks(PandaState.START) + with pytest.raises(RuntimeError): + status.wait(timeout=1) + assert status.done, "Status should be done" + assert not status.success, "Status should be unsuccessful" + + +def test_panda_receive_frame_data(panda_box, _signal_aliases): + """Test that _receive_frame_data processes data and updates signals.""" + # Create a mock frame data dict + data = np.array( + [ + (np.float64(0), np.float64(10)), + (np.float64(1), np.float64(11)), + (np.float64(2), np.float64(12)), + ], + dtype=[("FMC_IN.VAL1.Value", "