fix(timepix): Bugfixes and tests after adapting to the revised TimepixFly backend
CI for superxas_bec / test (pull_request) Successful in 1m26s
CI for superxas_bec / test (push) Successful in 1m45s

This commit was merged in pull request #17.
This commit is contained in:
x10da
2026-05-11 17:35:25 +02:00
committed by appel_c
parent d17f3deefa
commit d539ae5e98
5 changed files with 740 additions and 153 deletions
+100 -139
View File
@@ -19,11 +19,14 @@ from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Signal
from ophyd_devices import (
AsyncSignal,
CompareStatus,
DeviceStatus,
EpicsSignalRO,
EpicsSignalWithRBV,
ExceptionStatus,
FileEventSignal,
PreviewSignal,
StatusBase,
@@ -134,6 +137,13 @@ class FILEWRITEMODE(int, enum.Enum):
STREAM = 2
class IMAGEMODE(int, enum.Enum):
"""Values for ImageMode PV"""
SINGLE = 0
MULTIPLE = 1
def load_pixel_map_from_json(file_path: str) -> PixelMap:
"""Load a pixel map from a JSON file.
@@ -182,112 +192,36 @@ DETECTOR_SHAPE = (512, 1024) # Shape of the TimePix detector
class Timepix(PSIDeviceBase, TimePixControl):
"""
TimePix class. The IOC is running with the prefix 'X10DA-ES-TPX1:'.
The TimePixFly backend is running on p4-0017.psi.ch. Please check the port from the app
running in headless server mode. The backend_rest url can for instance be 'p4-0017.psi.ch:8452'.
The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch.
Integration of the Timepix detector for the SuperXAS beamline. The control interface is
given through the ASItpxCam EPICS interface, which is implemented in the TimePixControl class.
The IOC is running with the prefix 'X10DA-ES-TPX1:'.
The backend (TimePixFly) for XES data is running on p6-0008.psi.ch. Please check that the
backend is running in headless server mode. It must be started with an appropriate REST API URL
and socket port that are accessible from the BEC server, i.e. with option -cP6-0008.psi.ch:8452.
"""
MIN_DETECTOR_READOUT_TIME = 2.1e-3 # Minimum readout time in seconds for ASI TimePix detector
_DETECTOR_SHAPE = DETECTOR_SHAPE
USER_ACCESS = [
"troin",
"troistep",
"get_pixel_map",
"set_pixel_map",
"set_pixel_map_from_json_file",
"set_enable_xes",
]
# fmt: off
USER_ACCESS = ["troin", "troistep", "get_pixel_map", "set_pixel_map", "set_pixel_map_from_json_file", "set_enable_xes", "set_enable_image_writing"]
xes_data = Cpt(AsyncSignal, name="xes_data", ndim=2, max_size=1000, doc="Full XES data, 2D image with energypoints vs time bins.")
xes_spectra = Cpt(AsyncSignal, name="xes_spectra", ndim=1, max_size=1000, doc="1D spectra, integrated over energy bins.")
xes_energy_1 = Cpt(AsyncSignal, name="xes_energy_1", ndim=1, max_size=1000, doc="1D time spectra for energy bin 1.")
xes_energy_2 = Cpt(AsyncSignal, name="xes_energy_2", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.")
tds_period = Cpt(AsyncSignal, name="tds_period", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000, doc="TDS period recorded by the TimePixFly backend detector.")
total_periods = Cpt(AsyncSignal, name="total_periods", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000, doc="Total TDS periods recorded by the TimePixFly backend detector.")
total_events = Cpt(AsyncSignal, name="total_events", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000, doc="Total events recorded by the TimePixFly backend detector.")
xes_data = Cpt(
AsyncSignal,
name="xes_data",
ndim=2,
max_size=1000,
doc="Full XES data, 2D image with energypoints vs time bins.",
)
xes_spectra = Cpt(
AsyncSignal,
name="xes_spectra",
ndim=1,
max_size=1000,
doc="1D spectra, integrated over energy bins.",
)
xes_energy_1 = Cpt(
AsyncSignal,
name="xes_energy_1",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 1.",
)
xes_energy_2 = Cpt(
AsyncSignal,
name="xes_energy_2",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
tds_period = Cpt(
AsyncSignal,
name="tds_period",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="TDS period recorded by the TimePixFly backend detector.",
)
total_periods = Cpt(
AsyncSignal,
name="total_periods",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="Total TDS periods recorded by the TimePixFly backend detector.",
)
total_events = Cpt(
AsyncSignal,
name="total_events",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="Total events recorded by the TimePixFly backend detector.",
)
preview = Cpt(PreviewSignal, name="preview", ndim=2, num_rotation_90=1, doc="Preview signal of the TimePix detector.")
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=1,
doc="Preview signal of the TimePix detector.",
)
static_spectra = Cpt(AsyncSignal, name="static_spectra", ndim=1, max_size=1000, acquisition_group="monitored", async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]}, doc="Spectra signal of the TimePix detector.")
static_spectra = Cpt(
AsyncSignal,
name="static_spectra",
ndim=1,
max_size=1000,
acquisition_group="monitored",
async_update={"type": "add", "max_shape": [None, DETECTOR_SHAPE[0]]},
doc="Spectra signal of the TimePix detector.",
)
xes_data_accumulated_1 = Cpt(
AsyncSignal,
name="xes_accumulated_energy_1",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
xes_data_accumulated_2 = Cpt(
AsyncSignal,
name="xes_accumulated_energy_2",
ndim=1,
max_size=1000,
doc="1D time spectra for energy bin 2.",
)
file_event = Cpt(
FileEventSignal, name="file_event", doc="File event signal for TimePix detector."
)
xes_data_accumulated_1 = Cpt(AsyncSignal, name="xes_accumulated_energy_1", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.")
xes_data_accumulated_2 = Cpt(AsyncSignal, name="xes_accumulated_energy_2", ndim=1, max_size=1000, doc="1D time spectra for energy bin 2.")
file_event = Cpt(FileEventSignal, name="file_event", doc="File event signal for TimePix detector.")
# fmt: on
def __init__(
self,
@@ -511,6 +445,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""
self.enable_xes = enable
def set_enable_image_writing(self, enable: bool) -> None:
"""
Enable or disable image writing to file through the HDF5 plugin.
Args:
enable (bool): Whether to enable image writing.
"""
self.hdf.enable.set(1 if enable else 0).wait(timeout=self._pv_timeout)
@property
def enable_xes(self) -> bool:
"""Get whether XES data acquisition is enabled."""
@@ -610,6 +553,13 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
def wait_for_connection(
self, all_signals: bool = False, timeout: float | None = None, **kwargs
):
super().wait_for_connection(all_signals, timeout)
# Prepare backend for TimePixFly
self.backend.on_connected()
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
@@ -624,6 +574,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
# Reset array counter on connect
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
# Set image mode to multiple
self.cam.image_mode.set(IMAGEMODE.MULTIPLE).wait(timeout=self._pv_timeout)
# ------------------
# Prepare file writing through AD HDF5 plugin
@@ -639,8 +591,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Prepare TimePixFly backend
# -----------------
# Prepare backend for TimePixFly
self.backend.on_connected()
# Register the callback for processing data received by the backend
self.backend.add_callback(self.msg_buffer_callback)
self._poll_thread.start()
@@ -693,18 +643,18 @@ class Timepix(PSIDeviceBase, TimePixControl):
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
# Setup file writing for the total expected number of images
self.hdf.num_capture.set(self._n_images).wait(5)
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
if self.hdf.enable.get() == "Enable":
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
# -------------------------
# XES specific staging
@@ -714,8 +664,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
TRoiStep=self.troistep,
TRoiN=self.troin,
output_uri=f"tcp:{self.backend.hostname}:{self.backend.socket_port}",
save_interval=int(131000 / 5) - 5, # Save interval in 131kHz units,
)
save_interval=int(131000 / 5) - 5,
) # Save interval in 131kHz units,
logger.debug(f"Current TimePixFly configuration: {other_config}")
pixel_map = self.pixel_map
self.backend.on_stage(other_config=other_config, pixel_map=pixel_map)
@@ -740,11 +690,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_camera = CompareStatus(
self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout
)
status_writer = CompareStatus(
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
)
status = status_camera & status_writer
status_writer = None
if self.hdf.enable.get() == "Enable":
status_writer = CompareStatus(
self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout
)
if status_writer:
status = status_camera & status_writer
else:
status = status_camera
self.cancel_on_stop(status)
return status
@@ -799,12 +753,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
# Status Camera
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
return_status = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
# Status Writer
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images
status_writer = None
if self.hdf.enable.get() == "Enable":
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
st3 = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images & st3
# Status Backend
status_backend = None
@@ -813,9 +770,9 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_backend = self.backend.on_complete(status=status_backend)
# Combine the statuses
if status_backend is not None:
return_status = status_backend & status_camera & status_writer
else:
return_status = status_camera & status_writer
return_status = status_backend & return_status
if status_writer is not None:
return_status = return_status & status_writer
return_status.add_callback(self._complete_callback)
self.cancel_on_stop(return_status)
@@ -823,20 +780,24 @@ class Timepix(PSIDeviceBase, TimePixControl):
def _complete_callback(self, status: CompareStatus) -> None:
"""Callback for when the device completes a scan."""
if (
self.hdf.enable.get() != "Enable"
): # TODO: Not sure if we should support disabled file writing.
return
if status.success:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
file_path=self._full_path,
done=True,
successful=True,
hinted_h5_entries={"data": "/entry/data/data"},
)
) # pylint: disable:protected-access
else:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
file_path=self._full_path,
done=True,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
) # pylint: disable:protected-access
def on_stop(self) -> None:
"""Called when the device is stopped."""
@@ -849,11 +810,14 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_destroy(self):
"""Cleanup method to stop the device and clean up resources."""
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
try:
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
except Exception:
logger.warning(f"Failed to destroy {self.name}.")
# pylint: disable=protected-access
@@ -862,9 +826,9 @@ if __name__ == "__main__": # pragma: no cover
timepix = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="P6-0008.psi.ch:8452", # "P4-0017.psi.ch:8452",
backend_rest_url="P6-0008.psi.ch:8452",
hostname="x10da-bec-001.psi.ch",
)
) # "P4-0017.psi.ch:8452",
try:
# timepix.wait_for_connection(all_signals=True, timeout=10)
timepix.on_connected()
@@ -877,11 +841,8 @@ if __name__ == "__main__": # pragma: no cover
)
time.sleep(0.5)
timepix.scan_info.msg.scan_parameters.update(
{
"exp_time": exp_time, # Set exposure time to 1 second for testing
"frames_per_trigger": frames_per_trigger, # Set frames per trigger to 5 for testing
}
)
{"exp_time": exp_time, "frames_per_trigger": frames_per_trigger}
) # Set exposure time to 1 second for testing # Set frames per trigger to 5 for testing
timepix.stage()
logger.warning(f"Timepix on stage done")
timepix.pre_scan()
@@ -12,7 +12,6 @@ hooks for all the relevant ophyd interface, 'on_stage',
from __future__ import annotations
import json
import signal
import socket
import threading
import time
@@ -81,14 +80,14 @@ class TimepixFlyBackend:
###### Hooks for the PSIDeviceBase interface ######
###################################################
def on_connected(self):
def on_connected(self, timeout: float = 10):
"""Called if it is ensured that the device is connected."""
time_started = time.time()
logger.info("Connecting to Timepix Fly backend...")
try:
self.timepix_fly_client.on_connected()
self.timepix_fly_client.on_connected(timeout=timeout / 2)
status = self.start_data_server()
status.wait(timeout=5)
status.wait(timeout=timeout / 2)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
@@ -239,6 +238,7 @@ class TimepixFlyBackend:
def on_destroy(self):
"""Hook for on_destroy logic."""
time_started = time.time()
self.timepix_fly_client.shutdown()
self._data_thread_shutdown_event.set()
if self._data_thread is not None and self._data_thread.is_alive():
@@ -255,6 +255,9 @@ class TimepixFlyBackend:
except Exception:
content = traceback.format_exc()
logger.error(f"Error closing socket server: {content}")
logger.info(
f"Timepix Fly backend destroyed and resources cleaned up after {time.time() - time_started:.3f} seconds."
)
def on_stop(self):
"""Hook for on_stop logic."""
@@ -88,7 +88,7 @@ class TimepixFlyClient:
### Utility Methods ###
#############################
def on_connected(self) -> None:
def on_connected(self, timeout: float = 5) -> None:
"""
Called when the client is connected to the TimePix server.
This method can be overridden to perform actions when the client connects.
@@ -96,7 +96,7 @@ class TimepixFlyClient:
try:
self.stop_running_collection()
self.connect()
self.wait_for_connection(timeout=5)
self.wait_for_connection(timeout=timeout)
except Exception:
content = traceback.format_exc()
@@ -270,16 +270,18 @@ class TimepixFlyClient:
continue
if status in success:
dev_status.set_finished()
logger.debug(f"Status callback finished in succes: {status.value}")
logger.debug(f"Status callback finished in success: {status.value}")
self._status_callbacks.pop(cb_id)
elif status in error:
try:
last_error = self.last_error()
raise TimePixStatusError(
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
f"TimePixFly state '{status.value}': {last_error.message}"
)
except Exception as e:
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
logger.error(
f"Error in status callback for '{status.value}' from TimepixFly backend: {e}"
)
dev_status.set_exception(e)
self._status_callbacks.pop(cb_id)
# Reset the _started flag if the status is in CONFIG.
+426
View File
@@ -0,0 +1,426 @@
"""Unit tests for the Timepix device with a mocked backend."""
from __future__ import annotations
import threading
from types import SimpleNamespace
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd import DeviceStatus, StatusBase
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from superxas_bec.devices.timepix.timepix import (
ACQUIRESTATUS,
DATASOURCE,
EXPOSUREMODE,
FILEWRITEMODE,
TRIGGERMODE,
TRIGGERSOURCE,
TDCEdge,
TDCOuput,
Timepix,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
NetAddresses,
PixelMap,
)
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
class FakeBackendClient:
"""Backend client double that can finish status callbacks on demand."""
def __init__(self):
self.status = TimePixFlyStatus.CONFIG
self._status_callbacks = {}
def add_status_callback(self, status, success, error, run=True):
"""Register status callback with optional immediate completion."""
if run and self.status in success:
status.set_finished()
return
self._status_callbacks[id(status)] = (status, success, error)
def emit_status(self, status_value: TimePixFlyStatus):
"""Resolve tracked status objects with a simulated backend state."""
self.status = status_value
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
with status._lock:
if status.done:
self._status_callbacks.pop(cb_id, None)
continue
if status_value in success:
status.set_finished()
self._status_callbacks.pop(cb_id, None)
elif status_value in error:
status.set_exception(RuntimeError(f"backend entered {status_value.value}"))
self._status_callbacks.pop(cb_id, None)
def get_net_addresses(self):
"""Return a deterministic backend raw-data address."""
return NetAddresses(
control="127.0.0.1:8452", address="127.0.0.1:8451", server="127.0.0.1:8080"
)
class FakeBackend:
"""Minimal backend double used to isolate Timepix from backend integration."""
def __init__(self, *args, **kwargs):
self.hostname = kwargs.get("hostname") or "localhost"
self.socket_port = kwargs.get("socket_port", 9876)
self.timepix_fly_client = FakeBackendClient()
self.on_connected = mock.Mock()
self.on_stage = mock.Mock()
self.on_stop = mock.Mock()
self.on_destroy = mock.Mock()
self.add_callback = mock.Mock()
self._trigger_status = StatusBase()
self._trigger_status.set_finished()
self._complete_status = StatusBase()
self._complete_status.set_finished()
def on_trigger(self):
"""Return a backend-prepared trigger status."""
return self._trigger_status
def on_trigger_finished(self):
"""Return the status that resolves when acquisition is complete."""
return self._complete_status
def on_complete(self, status=None):
"""Return a backend completion status."""
if status is None:
return self._complete_status
status.set_finished()
return status
def _finished_status(device=None):
"""Create a finished status for mocked signal set operations."""
status = DeviceStatus(device=device)
status.set_finished()
return status
def _force_signal_value(signal, value):
"""Set a mocked PV-backed signal value, including read-only EPICS signals."""
if hasattr(signal, "_read_pv"):
signal._read_pv.mock_data = value
return
signal.put(value)
def _message_value(signal):
"""Extract the signal payload from a BEC message signal."""
msg = signal.get()
return msg.signals[signal.name]["value"]
@pytest.fixture(scope="function")
def timepix():
"""Timepix device with mocked EPICS signals and a fully mocked backend."""
backend = FakeBackend()
scan_info = SimpleNamespace(
msg=SimpleNamespace(
scan_name="step_scan",
scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2},
num_points=3,
)
)
with (
mock.patch.object(ophyd, "cl") as mock_cl,
mock.patch("superxas_bec.devices.timepix.timepix.TimepixFlyBackend", return_value=backend),
):
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="localhost:8452",
hostname="localhost",
socket_port=9876,
scan_info=scan_info,
device_manager=DMMock(),
)
patch_dual_pvs(dev)
dev.backend = backend
dev._poll_thread = mock.Mock()
for walk in dev.walk_signals():
signal = walk.item
if hasattr(signal, "set") and hasattr(signal, "put"):
signal.set = mock.Mock(side_effect=lambda value, _sig=signal, **_kw: (_force_signal_value(_sig, value), _finished_status(_sig))[1]) # type: ignore[method-assign]
yield dev
dev._poll_thread_kill_event.set()
@pytest.fixture(scope="function")
def pixel_map():
"""Small valid pixel map used for stage tests."""
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
def test_timepix_on_connected_configures_signals_and_registers_callback(timepix):
"""Connected hook should configure camera, file writer, and backend callback."""
timepix.on_connected()
assert timepix.cam.tdc1_enable.get() == 1
assert timepix.cam.tdc2_enable.get() == 1
assert timepix.cam.raw_enable.get() == 1
assert timepix.cam.tdc1_edge.get() == TDCEdge.RISING
assert timepix.cam.tdc1_output.get() == TDCOuput.ALL_CHANNELS
assert timepix.cam.trigger_mode.get() == TRIGGERMODE.INTERNAL
assert timepix.cam.trigger_source.get() == TRIGGERSOURCE.HDMI1_1
assert timepix.cam.exposure_mode.get() == EXPOSUREMODE.TIMED
assert timepix.cam.array_counter.get() == 0
assert timepix.hdf.enable.get() == "1"
assert timepix.hdf.file_write_mode.get() == FILEWRITEMODE.STREAM.value
assert timepix.hdf.auto_save.get() == 1
assert timepix.cam.array_callbacks.get() == 1
timepix.backend.add_callback.assert_called_once_with(timepix.msg_buffer_callback)
timepix._poll_thread.start.assert_called_once()
def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, pixel_map):
"""Stage should configure camera settings and forward config to the backend."""
timepix._pixel_map = pixel_map
with (
mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/test_scan.h5",
),
):
_force_signal_value(timepix.hdf.enable, "Enable")
timepix.on_stage()
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
assert timepix.cam.num_images.get() == 2
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
assert timepix.hdf.file_path.get() == "/tmp/timepix"
assert timepix.hdf.file_name.get() == "test_scan.h5"
assert timepix.hdf.num_capture.get() == 6
assert timepix.hdf.capture.get() == 1
assert timepix.cam.raw_file_template.get() == ""
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/test_scan.h5"
assert file_event.done is False
assert file_event.successful is False
def test_timepix_on_stage_configures_mocked_backend(timepix, pixel_map):
"""Stage should configure camera settings and forward config to the backend."""
timepix._pixel_map = pixel_map
with (
mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/test_scan.h5",
),
):
_force_signal_value(timepix.hdf.enable, "Disable")
# File event should not be emitted when hdf.enable is "Disable"
with mock.patch.object(timepix.file_event, "put") as mock_file_event_put:
timepix.hdf.capture.put(0)
timepix.on_stage()
mock_file_event_put.assert_not_called()
assert timepix.hdf.capture.get() == 0
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
assert timepix.cam.num_images.get() == 2
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
assert timepix.cam.raw_file_template.get() == ""
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
def test_timepix_on_stage_without_xes_skips_backend_configuration(timepix):
"""When XES is disabled, the backend-specific stage call should be skipped."""
timepix.enable_xes = False
with mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/no_xes_scan.h5",
):
timepix.on_stage()
timepix.backend.on_stage.assert_not_called()
def test_timepix_on_trigger_combines_camera_and_backend_status(timepix):
"""Trigger should arm the backend first and then drive the camera."""
backend_done = StatusBase()
backend_done.set_finished()
backend_finished = StatusBase()
timepix.backend._trigger_status = backend_done
timepix.backend._complete_status = backend_finished
status = timepix.on_trigger()
assert isinstance(status, StatusBase)
assert timepix.cam.acquire.get() == 1
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
backend_finished.set_finished()
status.wait(timeout=0.1)
assert status.done is True
assert status.success is True
def test_timepix_on_complete(timepix):
"""Complete should wait for writer/backend completion and emit a success file event."""
# I. Case I. With hdf.enable "Enable", should wait for both writer and camera to be done before emitting file event
timepix.hdf.enable.put("Enable")
timepix._full_path = "/tmp/timepix/final_scan.h5"
timepix._n_images = 3
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
status = timepix.on_complete()
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
timepix.hdf.capture.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_file.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_status.put(0)
_force_signal_value(timepix.hdf.num_captured, 3)
status.wait(timeout=1)
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/final_scan.h5"
assert file_event.done is True
assert file_event.successful is True
# II. Case II. With hdf.enable "Disable", should not wait for writer and only wait for camera to be done before emitting file event
timepix.hdf.enable.put("Disable")
timepix._n_images = 3
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
with mock.patch.object(timepix.file_event, "put") as mock_file_event_put:
status = timepix.on_complete()
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
status.wait(timeout=1)
mock_file_event_put.assert_not_called()
assert status.done is True
assert status.success is True
def test_timepix_msg_buffer_callback_updates_xes_signals(timepix):
"""The backend message callback should populate all exported XES data signals."""
start_frame = {
"type": "StartFrame",
"Mode": "TOA",
"TRoiStart": 0,
"TRoiStep": 1,
"TRoiN": 2,
"NumEnergyPoints": 8,
"save_interval": 1,
}
data_frames = [
{
"type": "XesData",
"period": 131000,
"totalEvents": 36,
"TDSpectra": list(range(16)),
"beforeROI": 0,
"afterROI": 0,
}
]
end_frame = {"type": "EndFrame", "error": "", "periods": 4}
timepix.troin = 2
timepix.msg_buffer_callback(start_frame, data_frames, end_frame)
expected_xes = np.array(
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], dtype=np.float32
)
np.testing.assert_array_equal(_message_value(timepix.xes_data), expected_xes)
np.testing.assert_array_equal(
_message_value(timepix.xes_spectra), np.array([28, 92], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32)
)
assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000)
np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0]))
assert _message_value(timepix.total_events) == 36
def test_timepix_on_stop_stops_camera_writer_and_backend(timepix):
"""Stop should stop camera acquisition and delegate backend stop."""
timepix.cam.acquire.put(1)
timepix.hdf.capture.put(1)
timepix.on_stop()
assert timepix.cam.acquire.get() == 0
assert timepix.hdf.capture.get() == 0
timepix.backend.on_stop.assert_called_once()
def test_timepix_on_destroy_cleans_up_backend(timepix):
"""Destroy should stop polling and forward cleanup to the backend."""
timepix.on_destroy()
assert timepix._poll_thread_kill_event.is_set() is True
timepix.backend.on_stop.assert_called_once()
timepix.backend.on_destroy.assert_called_once()
def test_timepix_on_prescan_returns_correct_status(timepix):
"""Prescan should return a finished status if the backend is ready."""
# Case I. With hdf.enable "Enable", should wait for both writer and camera to be ready
timepix.hdf.enable._read_pv.mock_data = "Enable"
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
timepix.hdf.capture._read_pv.mock_data = ACQUIRESTATUS.DONE
# Should be combined status of writer and control
status = timepix.on_pre_scan()
assert status.done is False
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
assert status.done is False
timepix.hdf.capture._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
status.wait(timeout=1)
assert status.done is True
assert status.success is True
# Case II. With hdf.enable "Enable", should wait for both writer and camera to be ready
timepix.hdf.enable._read_pv.mock_data = "Disable"
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
status = timepix.on_pre_scan()
assert status.done is False
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
status.wait(timeout=1)
assert status.done is True
assert status.success is True
+200 -5
View File
@@ -1,15 +1,210 @@
"""This module tests the Timepix Fly backend functionality."""
"""Unit tests for the Timepix Fly backend."""
from __future__ import annotations
from types import SimpleNamespace
from unittest import mock
import pytest
from ophyd import StatusBase
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimePixFlyStatus,
TimePixStatusError,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
NetAddresses,
OtherConfigModel,
PixelMap,
)
class FakeTimepixFlyClient:
"""Minimal client double that can drive backend status callbacks."""
def __init__(self, rest_url: str, ws_url: str):
self.rest_url = rest_url
self.ws_url = ws_url
self.status = TimePixFlyStatus.CONFIG
self._status_callbacks = {}
self.error_message = "boom"
self.on_connected = mock.Mock()
self.shutdown = mock.Mock()
self.start = mock.Mock()
self.stop_running_collection = mock.Mock()
self.set_other_config = mock.Mock()
self.set_pixel_map = mock.Mock()
self.get_net_addresses = mock.Mock(
return_value=NetAddresses(
control="127.0.0.1:8452", address="127.0.0.1:8451", server="127.0.0.1:8080"
)
)
def add_status_callback(self, status, success, error, run=True):
"""Store callbacks and optionally resolve them immediately."""
if run:
if self.status in success:
status.set_finished()
return
if self.status in error:
status.set_exception(
TimePixStatusError(
f"TimePixFly state '{self.status.value}': {self.error_message}"
)
)
return
self._status_callbacks[id(status)] = (status, success, error)
def last_error(self):
"""Return a lightweight error object."""
return SimpleNamespace(message=self.error_message)
def emit_status(self, status_value: TimePixFlyStatus):
"""Resolve stored status callbacks as if a websocket status update arrived."""
self.status = status_value
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
with status._lock:
if status.done:
self._status_callbacks.pop(cb_id, None)
continue
if status_value in success:
status.set_finished()
self._status_callbacks.pop(cb_id, None)
elif status_value in error:
status.set_exception(
TimePixStatusError(
f"TimePixFly state '{status_value.value}': {self.error_message}"
)
)
self._status_callbacks.pop(cb_id, None)
@pytest.fixture(scope="function")
def timepix_fly_backend():
"""Fixture for creating a Timepix Fly backend instance."""
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
yield backend
def backend_with_states():
"""Return a backend together with a helper that emits backend states."""
with mock.patch(
"superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend.TimepixFlyClient",
FakeTimepixFlyClient,
):
backend = TimepixFlyBackend(backend_rest_url="localhost:8452", hostname="localhost")
yield backend, backend.timepix_fly_client
backend.on_destroy()
@pytest.fixture(scope="function")
def pixel_map():
"""Small valid pixel map for backend unit tests."""
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
def test_timepix_fly_backend_stage_pushes_configuration(backend_with_states, pixel_map):
"""Stage should push both config objects to the client."""
backend, client = backend_with_states
other_config = OtherConfigModel(output_uri="tcp:localhost:9000", TRoiStep=2, TRoiN=16)
backend.on_stage(other_config=other_config, pixel_map=pixel_map)
client.set_other_config.assert_called_once_with(other_config)
client.set_pixel_map.assert_called_once_with(pixel_map)
def test_timepix_fly_backend_trigger_callback_success(backend_with_states):
"""Trigger status should resolve once the backend reports await_connection."""
backend, client = backend_with_states
status = backend.on_trigger()
assert status.done is False
client.emit_status(TimePixFlyStatus.AWAIT_CONNECTION)
status.wait(timeout=1)
assert status.done is True
assert status.success is True
def test_timepix_fly_backend_trigger_callback_error(backend_with_states):
"""Trigger status should fail when the backend reports an exception state."""
backend, client = backend_with_states
status = backend.on_trigger()
client.error_message = "failed to configure"
client.emit_status(TimePixFlyStatus.EXCEPT)
with pytest.raises(TimePixStatusError, match="failed to configure"):
status.wait(timeout=1)
def test_timepix_fly_backend_complete_callback_success(backend_with_states):
"""Complete status should resolve when the backend goes back to config."""
backend, client = backend_with_states
client.emit_status(TimePixFlyStatus.COLLECT)
status = backend.on_complete()
assert status.done is False
assert status.success is False
client.emit_status(TimePixFlyStatus.CONFIG)
status.wait(timeout=1)
assert status.done is True
assert status.success is True
client.emit_status(TimePixFlyStatus.COLLECT)
status = backend.on_complete()
client.error_message = "unexpected error during collection"
client.emit_status(TimePixFlyStatus.EXCEPT)
with pytest.raises(
TimePixStatusError, match="TimePixFly state 'except': unexpected error during collection"
):
status.wait(timeout=1)
def test_timepix_fly_backend_stop_cancels_tracked_statuses(backend_with_states):
"""Stopping the backend should fail all tracked statuses and stop collection."""
backend, client = backend_with_states
status = StatusBase()
backend.cancel_on_stop(status)
backend.on_stop()
client.stop_running_collection.assert_called_once()
with pytest.raises(RuntimeError, match="Stop called on device"):
status.wait(timeout=1)
def test_timepix_fly_backend_add_and_remove_callback(backend_with_states):
"""Callbacks can be registered and removed by id."""
backend, _ = backend_with_states
cb_id = backend.add_callback(lambda *_args, **_kwargs: None, kwd={"scan_id": 5})
stored_cb_id = next(iter(backend.callbacks))
assert str(stored_cb_id) == cb_id
backend.remove_callback(stored_cb_id)
assert stored_cb_id not in backend.callbacks
def test_timepix_fly_backend_decode_end_frame_runs_callbacks(backend_with_states):
"""The buffered frame callback should be invoked only once EndFrame arrives."""
backend, _ = backend_with_states
received = {}
def callback(start_frame, data_frames, end_frame, scan_id):
received["start_frame"] = start_frame
received["data_frames"] = data_frames
received["end_frame"] = end_frame
received["scan_id"] = scan_id
backend.add_callback(callback, kwd={"scan_id": 7})
backend._decode_received_data(
'{"type":"StartFrame","Mode":"TOA","TRoiStart":0,"TRoiStep":1,"TRoiN":2,"NumEnergyPoints":2,"save_interval":10}'
)
backend._decode_received_data(
'{"type":"XesData","period":1,"TDSpectra":[1,2,3,4],"totalEvents":4,"beforeROI":0,"afterROI":0}'
)
backend._decode_received_data('{"type":"EndFrame","error":"","periods":5}')
assert received["start_frame"]["type"] == "StartFrame"
assert received["data_frames"][0]["type"] == "XesData"
assert received["end_frame"]["type"] == "EndFrame"
assert received["scan_id"] == 7
assert backend._TimepixFlyBackend__msg_buffer == []