Files
superxas_bec/tests/tests_devices/test_timepix.py
T
x10da d539ae5e98
CI for superxas_bec / test (pull_request) Successful in 1m26s
CI for superxas_bec / test (push) Successful in 1m45s
fix(timepix): Bugfixes and tests after adapting to the revised TimepixFly backend
2026-05-13 12:44:51 +02:00

427 lines
16 KiB
Python

"""Unit tests for the Timepix device with a mocked backend."""
from __future__ import annotations
import threading
from types import SimpleNamespace
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd import DeviceStatus, StatusBase
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from superxas_bec.devices.timepix.timepix import (
ACQUIRESTATUS,
DATASOURCE,
EXPOSUREMODE,
FILEWRITEMODE,
TRIGGERMODE,
TRIGGERSOURCE,
TDCEdge,
TDCOuput,
Timepix,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
NetAddresses,
PixelMap,
)
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
class FakeBackendClient:
"""Backend client double that can finish status callbacks on demand."""
def __init__(self):
self.status = TimePixFlyStatus.CONFIG
self._status_callbacks = {}
def add_status_callback(self, status, success, error, run=True):
"""Register status callback with optional immediate completion."""
if run and self.status in success:
status.set_finished()
return
self._status_callbacks[id(status)] = (status, success, error)
def emit_status(self, status_value: TimePixFlyStatus):
"""Resolve tracked status objects with a simulated backend state."""
self.status = status_value
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
with status._lock:
if status.done:
self._status_callbacks.pop(cb_id, None)
continue
if status_value in success:
status.set_finished()
self._status_callbacks.pop(cb_id, None)
elif status_value in error:
status.set_exception(RuntimeError(f"backend entered {status_value.value}"))
self._status_callbacks.pop(cb_id, None)
def get_net_addresses(self):
"""Return a deterministic backend raw-data address."""
return NetAddresses(
control="127.0.0.1:8452", address="127.0.0.1:8451", server="127.0.0.1:8080"
)
class FakeBackend:
"""Minimal backend double used to isolate Timepix from backend integration."""
def __init__(self, *args, **kwargs):
self.hostname = kwargs.get("hostname") or "localhost"
self.socket_port = kwargs.get("socket_port", 9876)
self.timepix_fly_client = FakeBackendClient()
self.on_connected = mock.Mock()
self.on_stage = mock.Mock()
self.on_stop = mock.Mock()
self.on_destroy = mock.Mock()
self.add_callback = mock.Mock()
self._trigger_status = StatusBase()
self._trigger_status.set_finished()
self._complete_status = StatusBase()
self._complete_status.set_finished()
def on_trigger(self):
"""Return a backend-prepared trigger status."""
return self._trigger_status
def on_trigger_finished(self):
"""Return the status that resolves when acquisition is complete."""
return self._complete_status
def on_complete(self, status=None):
"""Return a backend completion status."""
if status is None:
return self._complete_status
status.set_finished()
return status
def _finished_status(device=None):
"""Create a finished status for mocked signal set operations."""
status = DeviceStatus(device=device)
status.set_finished()
return status
def _force_signal_value(signal, value):
"""Set a mocked PV-backed signal value, including read-only EPICS signals."""
if hasattr(signal, "_read_pv"):
signal._read_pv.mock_data = value
return
signal.put(value)
def _message_value(signal):
"""Extract the signal payload from a BEC message signal."""
msg = signal.get()
return msg.signals[signal.name]["value"]
@pytest.fixture(scope="function")
def timepix():
"""Timepix device with mocked EPICS signals and a fully mocked backend."""
backend = FakeBackend()
scan_info = SimpleNamespace(
msg=SimpleNamespace(
scan_name="step_scan",
scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2},
num_points=3,
)
)
with (
mock.patch.object(ophyd, "cl") as mock_cl,
mock.patch("superxas_bec.devices.timepix.timepix.TimepixFlyBackend", return_value=backend),
):
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="localhost:8452",
hostname="localhost",
socket_port=9876,
scan_info=scan_info,
device_manager=DMMock(),
)
patch_dual_pvs(dev)
dev.backend = backend
dev._poll_thread = mock.Mock()
for walk in dev.walk_signals():
signal = walk.item
if hasattr(signal, "set") and hasattr(signal, "put"):
signal.set = mock.Mock(side_effect=lambda value, _sig=signal, **_kw: (_force_signal_value(_sig, value), _finished_status(_sig))[1]) # type: ignore[method-assign]
yield dev
dev._poll_thread_kill_event.set()
@pytest.fixture(scope="function")
def pixel_map():
"""Small valid pixel map used for stage tests."""
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
def test_timepix_on_connected_configures_signals_and_registers_callback(timepix):
"""Connected hook should configure camera, file writer, and backend callback."""
timepix.on_connected()
assert timepix.cam.tdc1_enable.get() == 1
assert timepix.cam.tdc2_enable.get() == 1
assert timepix.cam.raw_enable.get() == 1
assert timepix.cam.tdc1_edge.get() == TDCEdge.RISING
assert timepix.cam.tdc1_output.get() == TDCOuput.ALL_CHANNELS
assert timepix.cam.trigger_mode.get() == TRIGGERMODE.INTERNAL
assert timepix.cam.trigger_source.get() == TRIGGERSOURCE.HDMI1_1
assert timepix.cam.exposure_mode.get() == EXPOSUREMODE.TIMED
assert timepix.cam.array_counter.get() == 0
assert timepix.hdf.enable.get() == "1"
assert timepix.hdf.file_write_mode.get() == FILEWRITEMODE.STREAM.value
assert timepix.hdf.auto_save.get() == 1
assert timepix.cam.array_callbacks.get() == 1
timepix.backend.add_callback.assert_called_once_with(timepix.msg_buffer_callback)
timepix._poll_thread.start.assert_called_once()
def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, pixel_map):
"""Stage should configure camera settings and forward config to the backend."""
timepix._pixel_map = pixel_map
with (
mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/test_scan.h5",
),
):
_force_signal_value(timepix.hdf.enable, "Enable")
timepix.on_stage()
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
assert timepix.cam.num_images.get() == 2
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
assert timepix.hdf.file_path.get() == "/tmp/timepix"
assert timepix.hdf.file_name.get() == "test_scan.h5"
assert timepix.hdf.num_capture.get() == 6
assert timepix.hdf.capture.get() == 1
assert timepix.cam.raw_file_template.get() == ""
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/test_scan.h5"
assert file_event.done is False
assert file_event.successful is False
def test_timepix_on_stage_configures_mocked_backend(timepix, pixel_map):
"""Stage should configure camera settings and forward config to the backend."""
timepix._pixel_map = pixel_map
with (
mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/test_scan.h5",
),
):
_force_signal_value(timepix.hdf.enable, "Disable")
# File event should not be emitted when hdf.enable is "Disable"
with mock.patch.object(timepix.file_event, "put") as mock_file_event_put:
timepix.hdf.capture.put(0)
timepix.on_stage()
mock_file_event_put.assert_not_called()
assert timepix.hdf.capture.get() == 0
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
assert timepix.cam.num_images.get() == 2
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
assert timepix.cam.raw_file_template.get() == ""
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
def test_timepix_on_stage_without_xes_skips_backend_configuration(timepix):
"""When XES is disabled, the backend-specific stage call should be skipped."""
timepix.enable_xes = False
with mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/no_xes_scan.h5",
):
timepix.on_stage()
timepix.backend.on_stage.assert_not_called()
def test_timepix_on_trigger_combines_camera_and_backend_status(timepix):
"""Trigger should arm the backend first and then drive the camera."""
backend_done = StatusBase()
backend_done.set_finished()
backend_finished = StatusBase()
timepix.backend._trigger_status = backend_done
timepix.backend._complete_status = backend_finished
status = timepix.on_trigger()
assert isinstance(status, StatusBase)
assert timepix.cam.acquire.get() == 1
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
backend_finished.set_finished()
status.wait(timeout=0.1)
assert status.done is True
assert status.success is True
def test_timepix_on_complete(timepix):
"""Complete should wait for writer/backend completion and emit a success file event."""
# I. Case I. With hdf.enable "Enable", should wait for both writer and camera to be done before emitting file event
timepix.hdf.enable.put("Enable")
timepix._full_path = "/tmp/timepix/final_scan.h5"
timepix._n_images = 3
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
status = timepix.on_complete()
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
timepix.hdf.capture.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_file.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_status.put(0)
_force_signal_value(timepix.hdf.num_captured, 3)
status.wait(timeout=1)
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/final_scan.h5"
assert file_event.done is True
assert file_event.successful is True
# II. Case II. With hdf.enable "Disable", should not wait for writer and only wait for camera to be done before emitting file event
timepix.hdf.enable.put("Disable")
timepix._n_images = 3
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
with mock.patch.object(timepix.file_event, "put") as mock_file_event_put:
status = timepix.on_complete()
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
status.wait(timeout=1)
mock_file_event_put.assert_not_called()
assert status.done is True
assert status.success is True
def test_timepix_msg_buffer_callback_updates_xes_signals(timepix):
"""The backend message callback should populate all exported XES data signals."""
start_frame = {
"type": "StartFrame",
"Mode": "TOA",
"TRoiStart": 0,
"TRoiStep": 1,
"TRoiN": 2,
"NumEnergyPoints": 8,
"save_interval": 1,
}
data_frames = [
{
"type": "XesData",
"period": 131000,
"totalEvents": 36,
"TDSpectra": list(range(16)),
"beforeROI": 0,
"afterROI": 0,
}
]
end_frame = {"type": "EndFrame", "error": "", "periods": 4}
timepix.troin = 2
timepix.msg_buffer_callback(start_frame, data_frames, end_frame)
expected_xes = np.array(
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], dtype=np.float32
)
np.testing.assert_array_equal(_message_value(timepix.xes_data), expected_xes)
np.testing.assert_array_equal(
_message_value(timepix.xes_spectra), np.array([28, 92], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32)
)
assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000)
np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0]))
assert _message_value(timepix.total_events) == 36
def test_timepix_on_stop_stops_camera_writer_and_backend(timepix):
"""Stop should stop camera acquisition and delegate backend stop."""
timepix.cam.acquire.put(1)
timepix.hdf.capture.put(1)
timepix.on_stop()
assert timepix.cam.acquire.get() == 0
assert timepix.hdf.capture.get() == 0
timepix.backend.on_stop.assert_called_once()
def test_timepix_on_destroy_cleans_up_backend(timepix):
"""Destroy should stop polling and forward cleanup to the backend."""
timepix.on_destroy()
assert timepix._poll_thread_kill_event.is_set() is True
timepix.backend.on_stop.assert_called_once()
timepix.backend.on_destroy.assert_called_once()
def test_timepix_on_prescan_returns_correct_status(timepix):
"""Prescan should return a finished status if the backend is ready."""
# Case I. With hdf.enable "Enable", should wait for both writer and camera to be ready
timepix.hdf.enable._read_pv.mock_data = "Enable"
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
timepix.hdf.capture._read_pv.mock_data = ACQUIRESTATUS.DONE
# Should be combined status of writer and control
status = timepix.on_pre_scan()
assert status.done is False
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
assert status.done is False
timepix.hdf.capture._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
status.wait(timeout=1)
assert status.done is True
assert status.success is True
# Case II. With hdf.enable "Enable", should wait for both writer and camera to be ready
timepix.hdf.enable._read_pv.mock_data = "Disable"
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING
status = timepix.on_pre_scan()
assert status.done is False
timepix.cam.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE
status.wait(timeout=1)
assert status.done is True
assert status.success is True