3 Commits

Author SHA1 Message Date
appel_c 5b14a10063 wip fixes
CI for superxas_bec / test (push) Failing after 32s
2026-05-12 12:38:26 +02:00
appel_c 19182daa47 wip bugfixes 2026-05-12 11:08:42 +02:00
x10da 67a26f231d fix timepix integration at superxas
CI for superxas_bec / test (push) Successful in 34s
2026-05-11 17:35:25 +02:00
5 changed files with 609 additions and 39 deletions
+49 -23
View File
@@ -19,11 +19,14 @@ from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
from ophyd import Signal
from ophyd_devices import (
AsyncSignal,
CompareStatus,
DeviceStatus,
EpicsSignalRO,
EpicsSignalWithRBV,
ExceptionStatus,
FileEventSignal,
PreviewSignal,
StatusBase,
@@ -198,6 +201,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
"set_pixel_map",
"set_pixel_map_from_json_file",
"set_enable_xes",
"set_enable_image_writing",
]
xes_data = Cpt(
@@ -511,6 +515,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""
self.enable_xes = enable
def set_enable_image_writing(self, enable: bool) -> None:
"""
Enable or disable image writing to file through the HDF5 plugin.
Args:
enable (bool): Whether to enable image writing.
"""
self.hdf.enable.set(1 if enable else 0).wait(timeout=self._pv_timeout)
@property
def enable_xes(self) -> bool:
"""Get whether XES data acquisition is enabled."""
@@ -610,6 +623,13 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
def wait_for_connection(
self, all_signals: bool = False, timeout: float | None = None, **kwargs
):
super().wait_for_connection(all_signals, timeout)
# Prepare backend for TimePixFly
self.backend.on_connected()
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
@@ -639,8 +659,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Prepare TimePixFly backend
# -----------------
# Prepare backend for TimePixFly
self.backend.on_connected()
# Register the callback for processing data received by the backend
self.backend.add_callback(self.msg_buffer_callback)
self._poll_thread.start()
@@ -693,18 +711,18 @@ class Timepix(PSIDeviceBase, TimePixControl):
file_path = "/".join(self._full_path.split("/")[:-1])
file_name = self._full_path.split("/")[-1]
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
self.hdf.file_path.set(file_path).wait(5)
self.hdf.file_name.set(file_name).wait(5)
# Setup file writing for the total expected number of images
self.hdf.num_capture.set(self._n_images).wait(5)
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
if self.hdf.enable.get() == 1:
self.hdf.capture.put(1)
self.file_event.put(
file_path=self._full_path,
done=False,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
)
# -------------------------
# XES specific staging
@@ -799,12 +817,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
# Status Camera
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
return_status = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
# Status Writer
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images
status_writer = None
if self.hdf.enable.get() == 1:
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
st3 = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
status_writer = st1 & st2 & status_written_images & st3
# Status Backend
status_backend = None
@@ -813,9 +834,9 @@ class Timepix(PSIDeviceBase, TimePixControl):
status_backend = self.backend.on_complete(status=status_backend)
# Combine the statuses
if status_backend is not None:
return_status = status_backend & status_camera & status_writer
else:
return_status = status_camera & status_writer
return_status = status_backend & return_status
if status_writer is not None:
return_status = return_status & status_writer
return_status.add_callback(self._complete_callback)
self.cancel_on_stop(return_status)
@@ -823,6 +844,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
def _complete_callback(self, status: CompareStatus) -> None:
"""Callback for when the device completes a scan."""
if self.hdf.enable.get() != 1: # TODO: Not sure if we should support disabled file writing.
return
if status.success:
self.file_event.put(
file_path=self._full_path, # pylint: disable:protected-access
@@ -849,11 +872,14 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_destroy(self):
"""Cleanup method to stop the device and clean up resources."""
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
try:
self.cam.acquire.put(0)
self.hdf.capture.put(0)
self._poll_thread_kill_event.set()
self.backend.on_stop()
self.backend.on_destroy()
except Exception:
logger.warning(f"Failed to destroy {self.name}.")
# pylint: disable=protected-access
@@ -12,7 +12,6 @@ hooks for all the relevant ophyd interface, 'on_stage',
from __future__ import annotations
import json
import signal
import socket
import threading
import time
@@ -81,14 +80,14 @@ class TimepixFlyBackend:
###### Hooks for the PSIDeviceBase interface ######
###################################################
def on_connected(self):
def on_connected(self, timeout: float = 10):
"""Called if it is ensured that the device is connected."""
time_started = time.time()
logger.info("Connecting to Timepix Fly backend...")
try:
self.timepix_fly_client.on_connected()
self.timepix_fly_client.on_connected(timeout=timeout / 2)
status = self.start_data_server()
status.wait(timeout=5)
status.wait(timeout=timeout / 2)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
@@ -239,6 +238,7 @@ class TimepixFlyBackend:
def on_destroy(self):
"""Hook for on_destroy logic."""
time_started = time.time()
self.timepix_fly_client.shutdown()
self._data_thread_shutdown_event.set()
if self._data_thread is not None and self._data_thread.is_alive():
@@ -255,6 +255,9 @@ class TimepixFlyBackend:
except Exception:
content = traceback.format_exc()
logger.error(f"Error closing socket server: {content}")
logger.info(
f"Timepix Fly backend destroyed and resources cleaned up after {time.time() - time_started:.3f} seconds."
)
def on_stop(self):
"""Hook for on_stop logic."""
@@ -88,7 +88,7 @@ class TimepixFlyClient:
### Utility Methods ###
#############################
def on_connected(self) -> None:
def on_connected(self, timeout: float = 5) -> None:
"""
Called when the client is connected to the TimePix server.
This method can be overridden to perform actions when the client connects.
@@ -96,7 +96,7 @@ class TimepixFlyClient:
try:
self.stop_running_collection()
self.connect()
self.wait_for_connection(timeout=5)
self.wait_for_connection(timeout=timeout)
except Exception:
content = traceback.format_exc()
@@ -270,16 +270,18 @@ class TimepixFlyClient:
continue
if status in success:
dev_status.set_finished()
logger.debug(f"Status callback finished in succes: {status.value}")
logger.debug(f"Status callback finished in success: {status.value}")
self._status_callbacks.pop(cb_id)
elif status in error:
try:
last_error = self.last_error()
raise TimePixStatusError(
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
f"TimePixFly state '{status.value}': {last_error.message}"
)
except Exception as e:
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
logger.error(
f"Error in status callback for '{status.value}' from TimepixFly backend: {e}"
)
dev_status.set_exception(e)
self._status_callbacks.pop(cb_id)
# Reset the _started flag if the status is in CONFIG.
+355
View File
@@ -0,0 +1,355 @@
"""Unit tests for the Timepix device with a mocked backend."""
from __future__ import annotations
import threading
from types import SimpleNamespace
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd import DeviceStatus, StatusBase
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from superxas_bec.devices.timepix.timepix import (
ACQUIRESTATUS,
DATASOURCE,
EXPOSUREMODE,
FILEWRITEMODE,
TDCEdge,
TDCOuput,
TRIGGERMODE,
TRIGGERSOURCE,
Timepix,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
NetAddresses,
PixelMap,
)
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
class FakeBackendClient:
"""Backend client double that can finish status callbacks on demand."""
def __init__(self):
self.status = TimePixFlyStatus.CONFIG
self._status_callbacks = {}
def add_status_callback(self, status, success, error, run=True):
"""Register status callback with optional immediate completion."""
if run and self.status in success:
status.set_finished()
return
self._status_callbacks[id(status)] = (status, success, error)
def emit_status(self, status_value: TimePixFlyStatus):
"""Resolve tracked status objects with a simulated backend state."""
self.status = status_value
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
with status._lock:
if status.done:
self._status_callbacks.pop(cb_id, None)
continue
if status_value in success:
status.set_finished()
self._status_callbacks.pop(cb_id, None)
elif status_value in error:
status.set_exception(RuntimeError(f"backend entered {status_value.value}"))
self._status_callbacks.pop(cb_id, None)
def get_net_addresses(self):
"""Return a deterministic backend raw-data address."""
return NetAddresses(
control="127.0.0.1:8452",
address="127.0.0.1:8451",
server="127.0.0.1:8080",
)
class FakeBackend:
"""Minimal backend double used to isolate Timepix from backend integration."""
def __init__(self, *args, **kwargs):
self.hostname = kwargs.get("hostname") or "localhost"
self.socket_port = kwargs.get("socket_port", 9876)
self.timepix_fly_client = FakeBackendClient()
self.on_connected = mock.Mock()
self.on_stage = mock.Mock()
self.on_stop = mock.Mock()
self.on_destroy = mock.Mock()
self.add_callback = mock.Mock()
self._trigger_status = StatusBase()
self._trigger_status.set_finished()
self._complete_status = StatusBase()
self._complete_status.set_finished()
def on_trigger(self):
"""Return a backend-prepared trigger status."""
return self._trigger_status
def on_trigger_finished(self):
"""Return the status that resolves when acquisition is complete."""
return self._complete_status
def on_complete(self, status=None):
"""Return a backend completion status."""
if status is None:
return self._complete_status
status.set_finished()
return status
def _finished_status(device=None):
"""Create a finished status for mocked signal set operations."""
status = DeviceStatus(device=device)
status.set_finished()
return status
def _force_signal_value(signal, value):
"""Set a mocked PV-backed signal value, including read-only EPICS signals."""
if hasattr(signal, "_read_pv"):
signal._read_pv.mock_data = value
return
signal.put(value)
def _message_value(signal):
"""Extract the signal payload from a BEC message signal."""
msg = signal.get()
return msg.signals[signal.name]["value"]
@pytest.fixture(scope="function")
def timepix():
"""Timepix device with mocked EPICS signals and a fully mocked backend."""
backend = FakeBackend()
scan_info = SimpleNamespace(
msg=SimpleNamespace(
scan_name="step_scan",
scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2},
num_points=3,
)
)
with (
mock.patch.object(ophyd, "cl") as mock_cl,
mock.patch(
"superxas_bec.devices.timepix.timepix.TimepixFlyBackend", return_value=backend
),
):
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Timepix(
name="timepix",
prefix="X10DA-ES-TPX1:",
backend_rest_url="localhost:8452",
hostname="localhost",
socket_port=9876,
scan_info=scan_info,
device_manager=DMMock(),
)
patch_dual_pvs(dev)
dev.backend = backend
dev._poll_thread = mock.Mock()
for walk in dev.walk_signals():
signal = walk.item
if hasattr(signal, "set") and hasattr(signal, "put"):
signal.set = mock.Mock(side_effect=lambda value, _sig=signal, **_kw: (_force_signal_value(_sig, value), _finished_status(_sig))[1]) # type: ignore[method-assign]
yield dev
dev._poll_thread_kill_event.set()
@pytest.fixture(scope="function")
def pixel_map():
"""Small valid pixel map used for stage tests."""
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
def test_timepix_on_connected_configures_signals_and_registers_callback(timepix):
"""Connected hook should configure camera, file writer, and backend callback."""
timepix.on_connected()
assert timepix.cam.tdc1_enable.get() == 1
assert timepix.cam.tdc2_enable.get() == 1
assert timepix.cam.raw_enable.get() == 1
assert timepix.cam.tdc1_edge.get() == TDCEdge.RISING
assert timepix.cam.tdc1_output.get() == TDCOuput.ALL_CHANNELS
assert timepix.cam.trigger_mode.get() == TRIGGERMODE.INTERNAL
assert timepix.cam.trigger_source.get() == TRIGGERSOURCE.HDMI1_1
assert timepix.cam.exposure_mode.get() == EXPOSUREMODE.TIMED
assert timepix.cam.array_counter.get() == 0
assert timepix.hdf.enable.get() == "1"
assert timepix.hdf.file_write_mode.get() == FILEWRITEMODE.STREAM.value
assert timepix.hdf.auto_save.get() == 1
assert timepix.cam.array_callbacks.get() == 1
timepix.backend.add_callback.assert_called_once_with(timepix.msg_buffer_callback)
timepix._poll_thread.start.assert_called_once()
def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, pixel_map):
"""Stage should configure camera settings and forward config to the backend."""
timepix._pixel_map = pixel_map
with (
mock.patch.object(timepix.hdf.enable, "get", return_value=1),
mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/test_scan.h5",
),
):
timepix.on_stage()
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
assert timepix.cam.num_images.get() == 2
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
assert timepix.hdf.file_path.get() == "/tmp/timepix"
assert timepix.hdf.file_name.get() == "test_scan.h5"
assert timepix.hdf.num_capture.get() == 6
assert timepix.hdf.capture.get() == 1
assert timepix.cam.raw_file_template.get() == ""
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
assert timepix.backend.on_stage.call_count == 1
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
assert other_config.output_uri == "tcp:localhost:9876"
assert other_config.TRoiStep == timepix.troistep
assert other_config.TRoiN == timepix.troin
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/test_scan.h5"
assert file_event.done is False
assert file_event.successful is False
def test_timepix_on_stage_without_xes_skips_backend_configuration(timepix):
"""When XES is disabled, the backend-specific stage call should be skipped."""
timepix.enable_xes = False
with mock.patch(
"superxas_bec.devices.timepix.timepix.get_full_path",
return_value="/tmp/timepix/no_xes_scan.h5",
):
timepix.on_stage()
timepix.backend.on_stage.assert_not_called()
def test_timepix_on_trigger_combines_camera_and_backend_status(timepix):
"""Trigger should arm the backend first and then drive the camera."""
backend_done = StatusBase()
backend_done.set_finished()
backend_finished = StatusBase()
timepix.backend._trigger_status = backend_done
timepix.backend._complete_status = backend_finished
status = timepix.on_trigger()
assert isinstance(status, StatusBase)
assert timepix.cam.acquire.get() == 1
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
backend_finished.set_finished()
status.wait(timeout=0.1)
assert status.done is True
assert status.success is True
def test_timepix_on_complete_marks_file_event_success(timepix):
"""Complete should wait for writer/backend completion and emit a success file event."""
timepix._full_path = "/tmp/timepix/final_scan.h5"
timepix._n_images = 3
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
with mock.patch.object(timepix.hdf.enable, "get", return_value=1):
status = timepix.on_complete()
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
timepix.hdf.capture.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_file.put(ACQUIRESTATUS.DONE)
timepix.hdf.write_status.put(0)
_force_signal_value(timepix.hdf.num_captured, 3)
status.wait(timeout=0.1)
file_event = timepix.file_event.get()
assert file_event.file_path == "/tmp/timepix/final_scan.h5"
assert file_event.done is True
assert file_event.successful is True
def test_timepix_msg_buffer_callback_updates_xes_signals(timepix):
"""The backend message callback should populate all exported XES data signals."""
start_frame = {
"type": "StartFrame",
"Mode": "TOA",
"TRoiStart": 0,
"TRoiStep": 1,
"TRoiN": 2,
"NumEnergyPoints": 8,
"save_interval": 1,
}
data_frames = [
{
"type": "XesData",
"period": 131000,
"totalEvents": 36,
"TDSpectra": list(range(16)),
"beforeROI": 0,
"afterROI": 0,
}
]
end_frame = {"type": "EndFrame", "error": "", "periods": 4}
timepix.troin = 2
timepix.msg_buffer_callback(start_frame, data_frames, end_frame)
expected_xes = np.array(
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], dtype=np.float32
)
np.testing.assert_array_equal(_message_value(timepix.xes_data), expected_xes)
np.testing.assert_array_equal(
_message_value(timepix.xes_spectra), np.array([28, 92], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32)
)
np.testing.assert_array_equal(
_message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32)
)
assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000)
np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0]))
assert _message_value(timepix.total_events) == 36
def test_timepix_on_stop_stops_camera_writer_and_backend(timepix):
"""Stop should stop camera acquisition and delegate backend stop."""
timepix.cam.acquire.put(1)
timepix.hdf.capture.put(1)
timepix.on_stop()
assert timepix.cam.acquire.get() == 0
assert timepix.hdf.capture.get() == 0
timepix.backend.on_stop.assert_called_once()
def test_timepix_on_destroy_cleans_up_backend(timepix):
"""Destroy should stop polling and forward cleanup to the backend."""
timepix.on_destroy()
assert timepix._poll_thread_kill_event.is_set() is True
timepix.backend.on_stop.assert_called_once()
timepix.backend.on_destroy.assert_called_once()
+191 -7
View File
@@ -1,15 +1,199 @@
"""This module tests the Timepix Fly backend functionality."""
"""Unit tests for the Timepix Fly backend."""
from __future__ import annotations
import pytest
from types import SimpleNamespace
from unittest import mock
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
import pytest
from ophyd import StatusBase
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import (
TimepixFlyBackend,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimePixFlyStatus,
TimePixStatusError,
)
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
NetAddresses,
OtherConfigModel,
PixelMap,
)
class FakeTimepixFlyClient:
"""Minimal client double that can drive backend status callbacks."""
def __init__(self, rest_url: str, ws_url: str):
self.rest_url = rest_url
self.ws_url = ws_url
self.status = TimePixFlyStatus.CONFIG
self._status_callbacks = {}
self.error_message = "boom"
self.on_connected = mock.Mock()
self.shutdown = mock.Mock()
self.start = mock.Mock()
self.stop_running_collection = mock.Mock()
self.set_other_config = mock.Mock()
self.set_pixel_map = mock.Mock()
self.get_net_addresses = mock.Mock(
return_value=NetAddresses(
control="127.0.0.1:8452",
address="127.0.0.1:8451",
server="127.0.0.1:8080",
)
)
def add_status_callback(self, status, success, error, run=True):
"""Store callbacks and optionally resolve them immediately."""
if run:
if self.status in success:
status.set_finished()
return
if self.status in error:
status.set_exception(
TimePixStatusError(f"TimePixFly state '{self.status.value}': {self.error_message}")
)
return
self._status_callbacks[id(status)] = (status, success, error)
def last_error(self):
"""Return a lightweight error object."""
return SimpleNamespace(message=self.error_message)
def emit_status(self, status_value: TimePixFlyStatus):
"""Resolve stored status callbacks as if a websocket status update arrived."""
self.status = status_value
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
with status._lock:
if status.done:
self._status_callbacks.pop(cb_id, None)
continue
if status_value in success:
status.set_finished()
self._status_callbacks.pop(cb_id, None)
elif status_value in error:
status.set_exception(
TimePixStatusError(
f"TimePixFly state '{status_value.value}': {self.error_message}"
)
)
self._status_callbacks.pop(cb_id, None)
@pytest.fixture(scope="function")
def timepix_fly_backend():
"""Fixture for creating a Timepix Fly backend instance."""
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
yield backend
def backend_with_states():
"""Return a backend together with a helper that emits backend states."""
with mock.patch(
"superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend.TimepixFlyClient",
FakeTimepixFlyClient,
):
backend = TimepixFlyBackend(backend_rest_url="localhost:8452", hostname="localhost")
yield backend, backend.timepix_fly_client
backend.on_destroy()
@pytest.fixture(scope="function")
def pixel_map():
"""Small valid pixel map for backend unit tests."""
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
def test_timepix_fly_backend_stage_pushes_configuration(backend_with_states, pixel_map):
"""Stage should push both config objects to the client."""
backend, client = backend_with_states
other_config = OtherConfigModel(output_uri="tcp:localhost:9000", TRoiStep=2, TRoiN=16)
backend.on_stage(other_config=other_config, pixel_map=pixel_map)
client.set_other_config.assert_called_once_with(other_config)
client.set_pixel_map.assert_called_once_with(pixel_map)
def test_timepix_fly_backend_trigger_callback_success(backend_with_states):
"""Trigger status should resolve once the backend reports await_connection."""
backend, state_driver = backend_with_states
status = backend.on_trigger()
assert status.done is False
state_driver.emit_status(TimePixFlyStatus.AWAIT_CONNECTION)
status.wait(timeout=0.1)
assert status.done is True
assert status.success is True
def test_timepix_fly_backend_trigger_callback_error(backend_with_states):
"""Trigger status should fail when the backend reports an exception state."""
backend, state_driver = backend_with_states
status = backend.on_trigger()
state_driver.error_message = "failed to configure"
state_driver.emit_status(TimePixFlyStatus.EXCEPT)
with pytest.raises(TimePixStatusError, match="failed to configure"):
status.wait(timeout=0.1)
def test_timepix_fly_backend_complete_callback_success(backend_with_states):
"""Complete status should resolve when the backend goes back to config."""
backend, state_driver = backend_with_states
status = backend.on_complete()
assert status.done is True
assert status.success is True
state_driver.emit_status(TimePixFlyStatus.CONFIG)
def test_timepix_fly_backend_stop_cancels_tracked_statuses(backend_with_states):
"""Stopping the backend should fail all tracked statuses and stop collection."""
backend, client = backend_with_states
status = StatusBase()
backend.cancel_on_stop(status)
backend.on_stop()
client.stop_running_collection.assert_called_once()
with pytest.raises(RuntimeError, match="Stop called on device"):
status.wait(timeout=0.1)
def test_timepix_fly_backend_add_and_remove_callback(backend_with_states):
"""Callbacks can be registered and removed by id."""
backend, _ = backend_with_states
cb_id = backend.add_callback(lambda *_args, **_kwargs: None, kwd={"scan_id": 5})
stored_cb_id = next(iter(backend.callbacks))
assert str(stored_cb_id) == cb_id
backend.remove_callback(stored_cb_id)
assert stored_cb_id not in backend.callbacks
def test_timepix_fly_backend_decode_end_frame_runs_callbacks(backend_with_states):
"""The buffered frame callback should be invoked only once EndFrame arrives."""
backend, _ = backend_with_states
received = {}
def callback(start_frame, data_frames, end_frame, scan_id):
received["start_frame"] = start_frame
received["data_frames"] = data_frames
received["end_frame"] = end_frame
received["scan_id"] = scan_id
backend.add_callback(callback, kwd={"scan_id": 7})
backend._decode_received_data(
'{"type":"StartFrame","Mode":"TOA","TRoiStart":0,"TRoiStep":1,"TRoiN":2,"NumEnergyPoints":2,"save_interval":10}'
)
backend._decode_received_data(
'{"type":"XesData","period":1,"TDSpectra":[1,2,3,4],"totalEvents":4,"beforeROI":0,"afterROI":0}'
)
backend._decode_received_data('{"type":"EndFrame","error":"","periods":5}')
assert received["start_frame"]["type"] == "StartFrame"
assert received["data_frames"][0]["type"] == "XesData"
assert received["end_frame"]["type"] == "EndFrame"
assert received["scan_id"] == 7
assert backend._TimepixFlyBackend__msg_buffer == []