From d539ae5e98ea1fee7242876fc601fa45ad5c0061 Mon Sep 17 00:00:00 2001 From: x10da Date: Mon, 11 May 2026 17:35:25 +0200 Subject: [PATCH] fix(timepix): Bugfixes and tests after adapting to the revised TimepixFly backend --- superxas_bec/devices/timepix/timepix.py | 239 ++++------ .../timepix_fly_client/timepix_fly_backend.py | 11 +- .../timepix_fly_client/timepix_fly_client.py | 12 +- tests/tests_devices/test_timepix.py | 426 ++++++++++++++++++ .../tests_devices/test_timepix_fly_backend.py | 205 ++++++++- 5 files changed, 740 insertions(+), 153 deletions(-) create mode 100644 tests/tests_devices/test_timepix.py diff --git a/superxas_bec/devices/timepix/timepix.py b/superxas_bec/devices/timepix/timepix.py index ca2a8af..1769245 100644 --- a/superxas_bec/devices/timepix/timepix.py +++ b/superxas_bec/devices/timepix/timepix.py @@ -19,11 +19,14 @@ from bec_lib.file_utils import get_full_path from bec_lib.logger import bec_logger from ophyd import ADBase from ophyd import Component as Cpt -from ophyd import EpicsSignalRO, EpicsSignalWithRBV +from ophyd import Signal from ophyd_devices import ( AsyncSignal, CompareStatus, DeviceStatus, + EpicsSignalRO, + EpicsSignalWithRBV, + ExceptionStatus, FileEventSignal, PreviewSignal, StatusBase, @@ -134,6 +137,13 @@ class FILEWRITEMODE(int, enum.Enum): STREAM = 2 +class IMAGEMODE(int, enum.Enum): + """Values for ImageMode PV""" + + SINGLE = 0 + MULTIPLE = 1 + + def load_pixel_map_from_json(file_path: str) -> PixelMap: """Load a pixel map from a JSON file. @@ -182,112 +192,36 @@ DETECTOR_SHAPE = (512, 1024) # Shape of the TimePix detector class Timepix(PSIDeviceBase, TimePixControl): """ - TimePix class. The IOC is running with the prefix 'X10DA-ES-TPX1:'. - The TimePixFly backend is running on p4-0017.psi.ch. Please check the port from the app - running in headless server mode. The backend_rest url can for instance be 'p4-0017.psi.ch:8452'. - The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch. + Integration of the Timepix detector for the SuperXAS beamline. The control interface is + given through the ASItpxCam EPICS interface, which is implemented in the TimePixControl class. + The IOC is running with the prefix 'X10DA-ES-TPX1:'. + + The backend (TimePixFly) for XES data is running on p6-0008.psi.ch. Please check that the + backend is running in headless server mode. It must be started with an appropriate REST API URL + and socket port that are accessible from the BEC server, i.e. with option -cP6-0008.psi.ch:8452. """ MIN_DETECTOR_READOUT_TIME = 2.1e-3 # Minimum readout time in seconds for ASI TimePix detector _DETECTOR_SHAPE = DETECTOR_SHAPE - USER_ACCESS = [ - "troin", - "troistep", - "get_pixel_map", - "set_pixel_map", - "set_pixel_map_from_json_file", - "set_enable_xes", - ] + # fmt: off + USER_ACCESS = ["troin", "troistep", "get_pixel_map", "set_pixel_map", "set_pixel_map_from_json_file", "set_enable_xes", "set_enable_image_writing"] + xes_data = Cpt(AsyncSignal, name="xes_data", ndim=2, max_size=1000, doc="Full XES data, 2D image with energypoints vs time bins.") + xes_spectra = Cpt(AsyncSignal, name="xes_spectra", ndim=1, max_size=1000, doc="1D spectra, integrated over energy bins.") + xes_energy_1 = Cpt(AsyncSignal, name="xes_energy_1", ndim=1, max_size=1000, doc="1D time spectra for energy bin 1.") + xes_energy_2 = Cpt(AsyncSignal, name="xes_energy_2", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.") + tds_period = Cpt(AsyncSignal, name="tds_period", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000, doc="TDS period recorded by the TimePixFly backend detector.") + total_periods = Cpt(AsyncSignal, name="total_periods", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000, doc="Total TDS periods recorded by the TimePixFly backend detector.") + total_events = Cpt(AsyncSignal, name="total_events", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000, doc="Total events recorded by the TimePixFly backend detector.") - xes_data = Cpt( - AsyncSignal, - name="xes_data", - ndim=2, - max_size=1000, - doc="Full XES data, 2D image with energypoints vs time bins.", - ) - xes_spectra = Cpt( - AsyncSignal, - name="xes_spectra", - ndim=1, - max_size=1000, - doc="1D spectra, integrated over energy bins.", - ) - xes_energy_1 = Cpt( - AsyncSignal, - name="xes_energy_1", - ndim=1, - max_size=1000, - doc="1D time spectra for energy bin 1.", - ) - xes_energy_2 = Cpt( - AsyncSignal, - name="xes_energy_2", - ndim=1, - max_size=1000, - doc="1D time spectra for energy bin 2.", - ) - tds_period = Cpt( - AsyncSignal, - name="tds_period", - ndim=0, - async_update={"type": "add", "max_shape": [None]}, - max_size=1000, - doc="TDS period recorded by the TimePixFly backend detector.", - ) - total_periods = Cpt( - AsyncSignal, - name="total_periods", - ndim=0, - async_update={"type": "add", "max_shape": [None]}, - max_size=1000, - doc="Total TDS periods recorded by the TimePixFly backend detector.", - ) - total_events = Cpt( - AsyncSignal, - name="total_events", - ndim=0, - async_update={"type": "add", "max_shape": [None]}, - max_size=1000, - doc="Total events recorded by the TimePixFly backend detector.", - ) + preview = Cpt(PreviewSignal, name="preview", ndim=2, num_rotation_90=1, doc="Preview signal of the TimePix detector.") - preview = Cpt( - PreviewSignal, - name="preview", - ndim=2, - num_rotation_90=1, - doc="Preview signal of the TimePix detector.", - ) + static_spectra = Cpt(AsyncSignal, name="static_spectra", ndim=1, max_size=1000, acquisition_group="monitored", async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]}, doc="Spectra signal of the TimePix detector.") - static_spectra = Cpt( - AsyncSignal, - name="static_spectra", - ndim=1, - max_size=1000, - acquisition_group="monitored", - async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]}, - doc="Spectra signal of the TimePix detector.", - ) - - xes_data_accumulated_1 = Cpt( - AsyncSignal, - name="xes_accumulated_energy_1", - ndim=1, - max_size=1000, - doc="1D time spectra for energy bin 2.", - ) - xes_data_accumulated_2 = Cpt( - AsyncSignal, - name="xes_accumulated_energy_2", - ndim=1, - max_size=1000, - doc="1D time spectra for energy bin 2.", - ) - file_event = Cpt( - FileEventSignal, name="file_event", doc="File event signal for TimePix detector." - ) + xes_data_accumulated_1 = Cpt(AsyncSignal, name="xes_accumulated_energy_1", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.") + xes_data_accumulated_2 = Cpt(AsyncSignal, name="xes_accumulated_energy_2", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.") + file_event = Cpt(FileEventSignal, name="file_event", doc="File event signal for TimePix detector.") + # fmt: on def __init__( self, @@ -511,6 +445,15 @@ class Timepix(PSIDeviceBase, TimePixControl): """ self.enable_xes = enable + def set_enable_image_writing(self, enable: bool) -> None: + """ + Enable or disable image writing to file through the HDF5 plugin. + + Args: + enable (bool): Whether to enable image writing. + """ + self.hdf.enable.set(1 if enable else 0).wait(timeout=self._pv_timeout) + @property def enable_xes(self) -> bool: """Get whether XES data acquisition is enabled.""" @@ -610,6 +553,13 @@ class Timepix(PSIDeviceBase, TimePixControl): self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout) self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout) + def wait_for_connection( + self, all_signals: bool = False, timeout: float | None = None, **kwargs + ): + super().wait_for_connection(all_signals, timeout) + # Prepare backend for TimePixFly + self.backend.on_connected() + def on_connected(self) -> None: """ Called after the device is connected and its signals are connected. @@ -624,6 +574,8 @@ class Timepix(PSIDeviceBase, TimePixControl): self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout) # Reset array counter on connect self.cam.array_counter.set(0).wait(timeout=self._pv_timeout) + # Set image mode to multiple + self.cam.image_mode.set(IMAGEMODE.MULTIPLE).wait(timeout=self._pv_timeout) # ------------------ # Prepare file writing through AD HDF5 plugin @@ -639,8 +591,6 @@ class Timepix(PSIDeviceBase, TimePixControl): # Prepare TimePixFly backend # ----------------- - # Prepare backend for TimePixFly - self.backend.on_connected() # Register the callback for processing data received by the backend self.backend.add_callback(self.msg_buffer_callback) self._poll_thread.start() @@ -693,18 +643,18 @@ class Timepix(PSIDeviceBase, TimePixControl): file_path = "/".join(self._full_path.split("/")[:-1]) file_name = self._full_path.split("/")[-1] self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks - # self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin self.hdf.file_path.set(file_path).wait(5) self.hdf.file_name.set(file_name).wait(5) # Setup file writing for the total expected number of images self.hdf.num_capture.set(self._n_images).wait(5) - self.hdf.capture.put(1) - self.file_event.put( - file_path=self._full_path, - done=False, - successful=False, - hinted_h5_entries={"data": "/entry/data/data"}, - ) + if self.hdf.enable.get() == "Enable": + self.hdf.capture.put(1) + self.file_event.put( + file_path=self._full_path, + done=False, + successful=False, + hinted_h5_entries={"data": "/entry/data/data"}, + ) # ------------------------- # XES specific staging @@ -714,8 +664,8 @@ class Timepix(PSIDeviceBase, TimePixControl): TRoiStep=self.troistep, TRoiN=self.troin, output_uri=f"tcp:{self.backend.hostname}:{self.backend.socket_port}", - save_interval=int(131000 / 5) - 5, # Save interval in 131kHz units, - ) + save_interval=int(131000 / 5) - 5, + ) # Save interval in 131kHz units, logger.debug(f"Current TimePixFly configuration: {other_config}") pixel_map = self.pixel_map self.backend.on_stage(other_config=other_config, pixel_map=pixel_map) @@ -740,11 +690,15 @@ class Timepix(PSIDeviceBase, TimePixControl): status_camera = CompareStatus( self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout ) - status_writer = CompareStatus( - self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout - ) - status = status_camera & status_writer - + status_writer = None + if self.hdf.enable.get() == "Enable": + status_writer = CompareStatus( + self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout + ) + if status_writer: + status = status_camera & status_writer + else: + status = status_camera self.cancel_on_stop(status) return status @@ -799,12 +753,15 @@ class Timepix(PSIDeviceBase, TimePixControl): def on_complete(self) -> DeviceStatus | StatusBase | None: """Called to inquire if a device has completed a scans.""" # Status Camera - status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE) + return_status = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE) # Status Writer st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE) - st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE) - status_written_images = CompareStatus(self.hdf.num_captured, self._n_images) - status_writer = st1 & st2 & status_written_images + status_writer = None + if self.hdf.enable.get() == "Enable": + st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE) + st3 = ExceptionStatus(self.hdf.write_status, 0, operation="!=") + status_written_images = CompareStatus(self.hdf.num_captured, self._n_images) + status_writer = st1 & st2 & status_written_images & st3 # Status Backend status_backend = None @@ -813,9 +770,9 @@ class Timepix(PSIDeviceBase, TimePixControl): status_backend = self.backend.on_complete(status=status_backend) # Combine the statuses if status_backend is not None: - return_status = status_backend & status_camera & status_writer - else: - return_status = status_camera & status_writer + return_status = status_backend & return_status + if status_writer is not None: + return_status = return_status & status_writer return_status.add_callback(self._complete_callback) self.cancel_on_stop(return_status) @@ -823,20 +780,24 @@ class Timepix(PSIDeviceBase, TimePixControl): def _complete_callback(self, status: CompareStatus) -> None: """Callback for when the device completes a scan.""" + if ( + self.hdf.enable.get() != "Enable" + ): # TODO: Not sure if we should support disabled file writing. + return if status.success: self.file_event.put( - file_path=self._full_path, # pylint: disable:protected-access + file_path=self._full_path, done=True, successful=True, hinted_h5_entries={"data": "/entry/data/data"}, - ) + ) # pylint: disable:protected-access else: self.file_event.put( - file_path=self._full_path, # pylint: disable:protected-access + file_path=self._full_path, done=True, successful=False, hinted_h5_entries={"data": "/entry/data/data"}, - ) + ) # pylint: disable:protected-access def on_stop(self) -> None: """Called when the device is stopped.""" @@ -849,11 +810,14 @@ class Timepix(PSIDeviceBase, TimePixControl): def on_destroy(self): """Cleanup method to stop the device and clean up resources.""" - self.cam.acquire.put(0) - self.hdf.capture.put(0) - self._poll_thread_kill_event.set() - self.backend.on_stop() - self.backend.on_destroy() + try: + self.cam.acquire.put(0) + self.hdf.capture.put(0) + self._poll_thread_kill_event.set() + self.backend.on_stop() + self.backend.on_destroy() + except Exception: + logger.warning(f"Failed to destroy {self.name}.") # pylint: disable=protected-access @@ -862,9 +826,9 @@ if __name__ == "__main__": # pragma: no cover timepix = Timepix( name="timepix", prefix="X10DA-ES-TPX1:", - backend_rest_url="P6-0008.psi.ch:8452", # "P4-0017.psi.ch:8452", + backend_rest_url="P6-0008.psi.ch:8452", hostname="x10da-bec-001.psi.ch", - ) + ) # "P4-0017.psi.ch:8452", try: # timepix.wait_for_connection(all_signals=True, timeout=10) timepix.on_connected() @@ -877,11 +841,8 @@ if __name__ == "__main__": # pragma: no cover ) time.sleep(0.5) timepix.scan_info.msg.scan_parameters.update( - { - "exp_time": exp_time, # Set exposure time to 1 second for testing - "frames_per_trigger": frames_per_trigger, # Set frames per trigger to 5 for testing - } - ) + {"exp_time": exp_time, "frames_per_trigger": frames_per_trigger} + ) # Set exposure time to 1 second for testing # Set frames per trigger to 5 for testing timepix.stage() logger.warning(f"Timepix on stage done") timepix.pre_scan() diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py index 54ff054..bc7f8bd 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py @@ -12,7 +12,6 @@ hooks for all the relevant ophyd interface, 'on_stage', from __future__ import annotations import json -import signal import socket import threading import time @@ -81,14 +80,14 @@ class TimepixFlyBackend: ###### Hooks for the PSIDeviceBase interface ###### ################################################### - def on_connected(self): + def on_connected(self, timeout: float = 10): """Called if it is ensured that the device is connected.""" time_started = time.time() logger.info("Connecting to Timepix Fly backend...") try: - self.timepix_fly_client.on_connected() + self.timepix_fly_client.on_connected(timeout=timeout / 2) status = self.start_data_server() - status.wait(timeout=5) + status.wait(timeout=timeout / 2) except Exception: # pylint: disable=broad-except content = traceback.format_exc() logger.error(f"Error starting data server: {content}") @@ -239,6 +238,7 @@ class TimepixFlyBackend: def on_destroy(self): """Hook for on_destroy logic.""" + time_started = time.time() self.timepix_fly_client.shutdown() self._data_thread_shutdown_event.set() if self._data_thread is not None and self._data_thread.is_alive(): @@ -255,6 +255,9 @@ class TimepixFlyBackend: except Exception: content = traceback.format_exc() logger.error(f"Error closing socket server: {content}") + logger.info( + f"Timepix Fly backend destroyed and resources cleaned up after {time.time() - time_started:.3f} seconds." + ) def on_stop(self): """Hook for on_stop logic.""" diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py index e9b95f4..e45c4aa 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_client.py @@ -88,7 +88,7 @@ class TimepixFlyClient: ### Utility Methods ### ############################# - def on_connected(self) -> None: + def on_connected(self, timeout: float = 5) -> None: """ Called when the client is connected to the TimePix server. This method can be overridden to perform actions when the client connects. @@ -96,7 +96,7 @@ class TimepixFlyClient: try: self.stop_running_collection() self.connect() - self.wait_for_connection(timeout=5) + self.wait_for_connection(timeout=timeout) except Exception: content = traceback.format_exc() @@ -270,16 +270,18 @@ class TimepixFlyClient: continue if status in success: dev_status.set_finished() - logger.debug(f"Status callback finished in succes: {status.value}") + logger.debug(f"Status callback finished in success: {status.value}") self._status_callbacks.pop(cb_id) elif status in error: try: last_error = self.last_error() raise TimePixStatusError( - f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}" + f"TimePixFly state '{status.value}': {last_error.message}" ) except Exception as e: - logger.error(f"Error in status callback from TimepixFly Backend: {e}") + logger.error( + f"Error in status callback for '{status.value}' from TimepixFly backend: {e}" + ) dev_status.set_exception(e) self._status_callbacks.pop(cb_id) # Reset the _started flag if the status is in CONFIG. diff --git a/tests/tests_devices/test_timepix.py b/tests/tests_devices/test_timepix.py new file mode 100644 index 0000000..2d426e8 --- /dev/null +++ b/tests/tests_devices/test_timepix.py @@ -0,0 +1,426 @@ +"""Unit tests for the Timepix device with a mocked backend.""" + +from __future__ import annotations + +import threading +from types import SimpleNamespace +from unittest import mock + +import numpy as np +import ophyd +import pytest +from bec_server.device_server.tests.utils import DMMock +from ophyd import DeviceStatus, StatusBase +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + +from superxas_bec.devices.timepix.timepix import ( + ACQUIRESTATUS, + DATASOURCE, + EXPOSUREMODE, + FILEWRITEMODE, + TRIGGERMODE, + TRIGGERSOURCE, + TDCEdge, + TDCOuput, + Timepix, +) +from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus +from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import ( + NetAddresses, + PixelMap, +) + +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name + + +class FakeBackendClient: + """Backend client double that can finish status callbacks on demand.""" + + def __init__(self): + self.status = TimePixFlyStatus.CONFIG + self._status_callbacks = {} + + def add_status_callback(self, status, success, error, run=True): + """Register status callback with optional immediate completion.""" + if run and self.status in success: + status.set_finished() + return + self._status_callbacks[id(status)] = (status, success, error) + + def emit_status(self, status_value: TimePixFlyStatus): + """Resolve tracked status objects with a simulated backend state.""" + self.status = status_value + for cb_id, (status, success, error) in list(self._status_callbacks.items()): + with status._lock: + if status.done: + self._status_callbacks.pop(cb_id, None) + continue + if status_value in success: + status.set_finished() + self._status_callbacks.pop(cb_id, None) + elif status_value in error: + status.set_exception(RuntimeError(f"backend entered {status_value.value}")) + self._status_callbacks.pop(cb_id, None) + + def get_net_addresses(self): + """Return a deterministic backend raw-data address.""" + return NetAddresses( + control="127.0.0.1:8452", address="127.0.0.1:8451", server="127.0.0.1:8080" + ) + + +class FakeBackend: + """Minimal backend double used to isolate Timepix from backend integration.""" + + def __init__(self, *args, **kwargs): + self.hostname = kwargs.get("hostname") or "localhost" + self.socket_port = kwargs.get("socket_port", 9876) + self.timepix_fly_client = FakeBackendClient() + self.on_connected = mock.Mock() + self.on_stage = mock.Mock() + self.on_stop = mock.Mock() + self.on_destroy = mock.Mock() + self.add_callback = mock.Mock() + self._trigger_status = StatusBase() + self._trigger_status.set_finished() + self._complete_status = StatusBase() + self._complete_status.set_finished() + + def on_trigger(self): + """Return a backend-prepared trigger status.""" + return self._trigger_status + + def on_trigger_finished(self): + """Return the status that resolves when acquisition is complete.""" + return self._complete_status + + def on_complete(self, status=None): + """Return a backend completion status.""" + if status is None: + return self._complete_status + status.set_finished() + return status + + +def _finished_status(device=None): + """Create a finished status for mocked signal set operations.""" + status = DeviceStatus(device=device) + status.set_finished() + return status + + +def _force_signal_value(signal, value): + """Set a mocked PV-backed signal value, including read-only EPICS signals.""" + if hasattr(signal, "_read_pv"): + signal._read_pv.mock_data = value + return + signal.put(value) + + +def _message_value(signal): + """Extract the signal payload from a BEC message signal.""" + msg = signal.get() + return msg.signals[signal.name]["value"] + + +@pytest.fixture(scope="function") +def timepix(): + """Timepix device with mocked EPICS signals and a fully mocked backend.""" + backend = FakeBackend() + scan_info = SimpleNamespace( + msg=SimpleNamespace( + scan_name="step_scan", + scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2}, + num_points=3, + ) + ) + with ( + mock.patch.object(ophyd, "cl") as mock_cl, + mock.patch("superxas_bec.devices.timepix.timepix.TimepixFlyBackend", return_value=backend), + ): + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = Timepix( + name="timepix", + prefix="X10DA-ES-TPX1:", + backend_rest_url="localhost:8452", + hostname="localhost", + socket_port=9876, + scan_info=scan_info, + device_manager=DMMock(), + ) + patch_dual_pvs(dev) + dev.backend = backend + dev._poll_thread = mock.Mock() + for walk in dev.walk_signals(): + signal = walk.item + if hasattr(signal, "set") and hasattr(signal, "put"): + signal.set = mock.Mock(side_effect=lambda value, _sig=signal, **_kw: (_force_signal_value(_sig, value), _finished_status(_sig))[1]) # type: ignore[method-assign] + yield dev + dev._poll_thread_kill_event.set() + + +@pytest.fixture(scope="function") +def pixel_map(): + """Small valid pixel map used for stage tests.""" + return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]]) + + +def test_timepix_on_connected_configures_signals_and_registers_callback(timepix): + """Connected hook should configure camera, file writer, and backend callback.""" + timepix.on_connected() + + assert timepix.cam.tdc1_enable.get() == 1 + assert timepix.cam.tdc2_enable.get() == 1 + assert timepix.cam.raw_enable.get() == 1 + assert timepix.cam.tdc1_edge.get() == TDCEdge.RISING + assert timepix.cam.tdc1_output.get() == TDCOuput.ALL_CHANNELS + assert timepix.cam.trigger_mode.get() == TRIGGERMODE.INTERNAL + assert timepix.cam.trigger_source.get() == TRIGGERSOURCE.HDMI1_1 + assert timepix.cam.exposure_mode.get() == EXPOSUREMODE.TIMED + assert timepix.cam.array_counter.get() == 0 + assert timepix.hdf.enable.get() == "1" + assert timepix.hdf.file_write_mode.get() == FILEWRITEMODE.STREAM.value + assert timepix.hdf.auto_save.get() == 1 + assert timepix.cam.array_callbacks.get() == 1 + timepix.backend.add_callback.assert_called_once_with(timepix.msg_buffer_callback) + timepix._poll_thread.start.assert_called_once() + + +def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, pixel_map): + """Stage should configure camera settings and forward config to the backend.""" + timepix._pixel_map = pixel_map + + with ( + mock.patch( + "superxas_bec.devices.timepix.timepix.get_full_path", + return_value="/tmp/timepix/test_scan.h5", + ), + ): + _force_signal_value(timepix.hdf.enable, "Enable") + timepix.on_stage() + + assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time) + assert timepix.cam.acquire_period.get() == pytest.approx(0.1) + assert timepix.cam.num_images.get() == 2 + assert timepix.cam.data_source.get() == DATASOURCE.IMAGE + assert timepix.hdf.file_path.get() == "/tmp/timepix" + assert timepix.hdf.file_name.get() == "test_scan.h5" + assert timepix.hdf.num_capture.get() == 6 + assert timepix.hdf.capture.get() == 1 + assert timepix.cam.raw_file_template.get() == "" + assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451" + assert timepix.backend.on_stage.call_count == 1 + other_config = timepix.backend.on_stage.call_args.kwargs["other_config"] + assert other_config.output_uri == "tcp:localhost:9876" + assert other_config.TRoiStep == timepix.troistep + assert other_config.TRoiN == timepix.troin + assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map + file_event = timepix.file_event.get() + assert file_event.file_path == "/tmp/timepix/test_scan.h5" + assert file_event.done is False + assert file_event.successful is False + + +def test_timepix_on_stage_configures_mocked_backend(timepix, pixel_map): + """Stage should configure camera settings and forward config to the backend.""" + timepix._pixel_map = pixel_map + + with ( + mock.patch( + "superxas_bec.devices.timepix.timepix.get_full_path", + return_value="/tmp/timepix/test_scan.h5", + ), + ): + _force_signal_value(timepix.hdf.enable, "Disable") + # File event should not be emitted when hdf.enable is "Disable" + with mock.patch.object(timepix.file_event, "put") as mock_file_event_put: + timepix.hdf.capture.put(0) + timepix.on_stage() + mock_file_event_put.assert_not_called() + + assert timepix.hdf.capture.get() == 0 + assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time) + assert timepix.cam.acquire_period.get() == pytest.approx(0.1) + assert timepix.cam.num_images.get() == 2 + assert timepix.cam.data_source.get() == DATASOURCE.IMAGE + assert timepix.cam.raw_file_template.get() == "" + assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451" + assert timepix.backend.on_stage.call_count == 1 + other_config = timepix.backend.on_stage.call_args.kwargs["other_config"] + assert other_config.output_uri == "tcp:localhost:9876" + assert other_config.TRoiStep == timepix.troistep + assert other_config.TRoiN == timepix.troin + assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map + + +def test_timepix_on_stage_without_xes_skips_backend_configuration(timepix): + """When XES is disabled, the backend-specific stage call should be skipped.""" + timepix.enable_xes = False + + with mock.patch( + "superxas_bec.devices.timepix.timepix.get_full_path", + return_value="/tmp/timepix/no_xes_scan.h5", + ): + timepix.on_stage() + + timepix.backend.on_stage.assert_not_called() + + +def test_timepix_on_trigger_combines_camera_and_backend_status(timepix): + """Trigger should arm the backend first and then drive the camera.""" + backend_done = StatusBase() + backend_done.set_finished() + backend_finished = StatusBase() + timepix.backend._trigger_status = backend_done + timepix.backend._complete_status = backend_finished + + status = timepix.on_trigger() + + assert isinstance(status, StatusBase) + assert timepix.cam.acquire.get() == 1 + _force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING) + _force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE) + backend_finished.set_finished() + status.wait(timeout=0.1) + assert status.done is True + assert status.success is True + + +def test_timepix_on_complete(timepix): + """Complete should wait for writer/backend completion and emit a success file event.""" + + # I. Case I. With hdf.enable "Enable", should wait for both writer and camera to be done before emitting file event + timepix.hdf.enable.put("Enable") + timepix._full_path = "/tmp/timepix/final_scan.h5" + timepix._n_images = 3 + _force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING) + timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING) + timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING) + + status = timepix.on_complete() + + _force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE) + timepix.hdf.capture.put(ACQUIRESTATUS.DONE) + timepix.hdf.write_file.put(ACQUIRESTATUS.DONE) + timepix.hdf.write_status.put(0) + _force_signal_value(timepix.hdf.num_captured, 3) + status.wait(timeout=1) + + file_event = timepix.file_event.get() + assert file_event.file_path == "/tmp/timepix/final_scan.h5" + assert file_event.done is True + assert file_event.successful is True + + # II. Case II. With hdf.enable "Disable", should not wait for writer and only wait for camera to be done before emitting file event + timepix.hdf.enable.put("Disable") + timepix._n_images = 3 + _force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING) + timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING) + timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING) + with mock.patch.object(timepix.file_event, "put") as mock_file_event_put: + status = timepix.on_complete() + _force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE) + status.wait(timeout=1) + mock_file_event_put.assert_not_called() + assert status.done is True + assert status.success is True + + +def test_timepix_msg_buffer_callback_updates_xes_signals(timepix): + """The backend message callback should populate all exported XES data signals.""" + start_frame = { + "type": "StartFrame", + "Mode": "TOA", + "TRoiStart": 0, + "TRoiStep": 1, + "TRoiN": 2, + "NumEnergyPoints": 8, + "save_interval": 1, + } + data_frames = [ + { + "type": "XesData", + "period": 131000, + "totalEvents": 36, + "TDSpectra": list(range(16)), + "beforeROI": 0, + "afterROI": 0, + } + ] + end_frame = {"type": "EndFrame", "error": "", "periods": 4} + timepix.troin = 2 + + timepix.msg_buffer_callback(start_frame, data_frames, end_frame) + + expected_xes = np.array( + [[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], dtype=np.float32 + ) + np.testing.assert_array_equal(_message_value(timepix.xes_data), expected_xes) + np.testing.assert_array_equal( + _message_value(timepix.xes_spectra), np.array([28, 92], dtype=np.float32) + ) + np.testing.assert_array_equal( + _message_value(timepix.xes_energy_1), np.array([6, 38], dtype=np.float32) + ) + np.testing.assert_array_equal( + _message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32) + ) + np.testing.assert_array_equal( + _message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32) + ) + np.testing.assert_array_equal( + _message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32) + ) + assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000) + np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0])) + assert _message_value(timepix.total_events) == 36 + + +def test_timepix_on_stop_stops_camera_writer_and_backend(timepix): + """Stop should stop camera acquisition and delegate backend stop.""" + timepix.cam.acquire.put(1) + timepix.hdf.capture.put(1) + + timepix.on_stop() + + assert timepix.cam.acquire.get() == 0 + assert timepix.hdf.capture.get() == 0 + timepix.backend.on_stop.assert_called_once() + + +def test_timepix_on_destroy_cleans_up_backend(timepix): + """Destroy should stop polling and forward cleanup to the backend.""" + timepix.on_destroy() + + assert timepix._poll_thread_kill_event.is_set() is True + timepix.backend.on_stop.assert_called_once() + timepix.backend.on_destroy.assert_called_once() + + +def test_timepix_on_prescan_returns_correct_status(timepix): + """Prescan should return a finished status if the backend is ready.""" + # Case I. With hdf.enable "Enable", should wait for both writer and camera to be ready + timepix.hdf.enable._read_pv.mock_data = "Enable" + timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING + timepix.hdf.capture._read_pv.mock_data = ACQUIRESTATUS.DONE + # Should be combined status of writer and control + status = timepix.on_pre_scan() + assert status.done is False + timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE + assert status.done is False + timepix.hdf.capture._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING + status.wait(timeout=1) + assert status.done is True + assert status.success is True + + # Case II. With hdf.enable "Enable", should wait for both writer and camera to be ready + timepix.hdf.enable._read_pv.mock_data = "Disable" + timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING + status = timepix.on_pre_scan() + assert status.done is False + timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE + status.wait(timeout=1) + assert status.done is True + assert status.success is True diff --git a/tests/tests_devices/test_timepix_fly_backend.py b/tests/tests_devices/test_timepix_fly_backend.py index d1b6671..a9857c3 100644 --- a/tests/tests_devices/test_timepix_fly_backend.py +++ b/tests/tests_devices/test_timepix_fly_backend.py @@ -1,15 +1,210 @@ -"""This module tests the Timepix Fly backend functionality.""" +"""Unit tests for the Timepix Fly backend.""" from __future__ import annotations +from types import SimpleNamespace +from unittest import mock + import pytest +from ophyd import StatusBase from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend +from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import ( + TimePixFlyStatus, + TimePixStatusError, +) +from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import ( + NetAddresses, + OtherConfigModel, + PixelMap, +) + + +class FakeTimepixFlyClient: + """Minimal client double that can drive backend status callbacks.""" + + def __init__(self, rest_url: str, ws_url: str): + self.rest_url = rest_url + self.ws_url = ws_url + self.status = TimePixFlyStatus.CONFIG + self._status_callbacks = {} + self.error_message = "boom" + self.on_connected = mock.Mock() + self.shutdown = mock.Mock() + self.start = mock.Mock() + self.stop_running_collection = mock.Mock() + self.set_other_config = mock.Mock() + self.set_pixel_map = mock.Mock() + self.get_net_addresses = mock.Mock( + return_value=NetAddresses( + control="127.0.0.1:8452", address="127.0.0.1:8451", server="127.0.0.1:8080" + ) + ) + + def add_status_callback(self, status, success, error, run=True): + """Store callbacks and optionally resolve them immediately.""" + if run: + if self.status in success: + status.set_finished() + return + if self.status in error: + status.set_exception( + TimePixStatusError( + f"TimePixFly state '{self.status.value}': {self.error_message}" + ) + ) + return + self._status_callbacks[id(status)] = (status, success, error) + + def last_error(self): + """Return a lightweight error object.""" + return SimpleNamespace(message=self.error_message) + + def emit_status(self, status_value: TimePixFlyStatus): + """Resolve stored status callbacks as if a websocket status update arrived.""" + self.status = status_value + for cb_id, (status, success, error) in list(self._status_callbacks.items()): + with status._lock: + if status.done: + self._status_callbacks.pop(cb_id, None) + continue + if status_value in success: + status.set_finished() + self._status_callbacks.pop(cb_id, None) + elif status_value in error: + status.set_exception( + TimePixStatusError( + f"TimePixFly state '{status_value.value}': {self.error_message}" + ) + ) + self._status_callbacks.pop(cb_id, None) @pytest.fixture(scope="function") -def timepix_fly_backend(): - """Fixture for creating a Timepix Fly backend instance.""" - backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000") - yield backend +def backend_with_states(): + """Return a backend together with a helper that emits backend states.""" + with mock.patch( + "superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend.TimepixFlyClient", + FakeTimepixFlyClient, + ): + backend = TimepixFlyBackend(backend_rest_url="localhost:8452", hostname="localhost") + yield backend, backend.timepix_fly_client backend.on_destroy() + + +@pytest.fixture(scope="function") +def pixel_map(): + """Small valid pixel map for backend unit tests.""" + return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]]) + + +def test_timepix_fly_backend_stage_pushes_configuration(backend_with_states, pixel_map): + """Stage should push both config objects to the client.""" + backend, client = backend_with_states + other_config = OtherConfigModel(output_uri="tcp:localhost:9000", TRoiStep=2, TRoiN=16) + + backend.on_stage(other_config=other_config, pixel_map=pixel_map) + + client.set_other_config.assert_called_once_with(other_config) + client.set_pixel_map.assert_called_once_with(pixel_map) + + +def test_timepix_fly_backend_trigger_callback_success(backend_with_states): + """Trigger status should resolve once the backend reports await_connection.""" + backend, client = backend_with_states + + status = backend.on_trigger() + assert status.done is False + + client.emit_status(TimePixFlyStatus.AWAIT_CONNECTION) + + status.wait(timeout=1) + assert status.done is True + assert status.success is True + + +def test_timepix_fly_backend_trigger_callback_error(backend_with_states): + """Trigger status should fail when the backend reports an exception state.""" + backend, client = backend_with_states + + status = backend.on_trigger() + client.error_message = "failed to configure" + client.emit_status(TimePixFlyStatus.EXCEPT) + + with pytest.raises(TimePixStatusError, match="failed to configure"): + status.wait(timeout=1) + + +def test_timepix_fly_backend_complete_callback_success(backend_with_states): + """Complete status should resolve when the backend goes back to config.""" + backend, client = backend_with_states + + client.emit_status(TimePixFlyStatus.COLLECT) + status = backend.on_complete() + assert status.done is False + assert status.success is False + client.emit_status(TimePixFlyStatus.CONFIG) + status.wait(timeout=1) + assert status.done is True + assert status.success is True + + client.emit_status(TimePixFlyStatus.COLLECT) + status = backend.on_complete() + client.error_message = "unexpected error during collection" + client.emit_status(TimePixFlyStatus.EXCEPT) + with pytest.raises( + TimePixStatusError, match="TimePixFly state 'except': unexpected error during collection" + ): + status.wait(timeout=1) + + +def test_timepix_fly_backend_stop_cancels_tracked_statuses(backend_with_states): + """Stopping the backend should fail all tracked statuses and stop collection.""" + backend, client = backend_with_states + status = StatusBase() + backend.cancel_on_stop(status) + + backend.on_stop() + + client.stop_running_collection.assert_called_once() + with pytest.raises(RuntimeError, match="Stop called on device"): + status.wait(timeout=1) + + +def test_timepix_fly_backend_add_and_remove_callback(backend_with_states): + """Callbacks can be registered and removed by id.""" + backend, _ = backend_with_states + + cb_id = backend.add_callback(lambda *_args, **_kwargs: None, kwd={"scan_id": 5}) + + stored_cb_id = next(iter(backend.callbacks)) + assert str(stored_cb_id) == cb_id + backend.remove_callback(stored_cb_id) + assert stored_cb_id not in backend.callbacks + + +def test_timepix_fly_backend_decode_end_frame_runs_callbacks(backend_with_states): + """The buffered frame callback should be invoked only once EndFrame arrives.""" + backend, _ = backend_with_states + received = {} + + def callback(start_frame, data_frames, end_frame, scan_id): + received["start_frame"] = start_frame + received["data_frames"] = data_frames + received["end_frame"] = end_frame + received["scan_id"] = scan_id + + backend.add_callback(callback, kwd={"scan_id": 7}) + backend._decode_received_data( + '{"type":"StartFrame","Mode":"TOA","TRoiStart":0,"TRoiStep":1,"TRoiN":2,"NumEnergyPoints":2,"save_interval":10}' + ) + backend._decode_received_data( + '{"type":"XesData","period":1,"TDSpectra":[1,2,3,4],"totalEvents":4,"beforeROI":0,"afterROI":0}' + ) + backend._decode_received_data('{"type":"EndFrame","error":"","periods":5}') + + assert received["start_frame"]["type"] == "StartFrame" + assert received["data_frames"][0]["type"] == "XesData" + assert received["end_frame"]["type"] == "EndFrame" + assert received["scan_id"] == 7 + assert backend._TimepixFlyBackend__msg_buffer == []