Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5b14a10063 | |||
| 19182daa47 | |||
| 67a26f231d |
@@ -19,11 +19,14 @@ from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import ADBase
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import EpicsSignalRO, EpicsSignalWithRBV
|
||||
from ophyd import Signal
|
||||
from ophyd_devices import (
|
||||
AsyncSignal,
|
||||
CompareStatus,
|
||||
DeviceStatus,
|
||||
EpicsSignalRO,
|
||||
EpicsSignalWithRBV,
|
||||
ExceptionStatus,
|
||||
FileEventSignal,
|
||||
PreviewSignal,
|
||||
StatusBase,
|
||||
@@ -198,6 +201,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
"set_pixel_map",
|
||||
"set_pixel_map_from_json_file",
|
||||
"set_enable_xes",
|
||||
"set_enable_image_writing",
|
||||
]
|
||||
|
||||
xes_data = Cpt(
|
||||
@@ -511,6 +515,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
"""
|
||||
self.enable_xes = enable
|
||||
|
||||
def set_enable_image_writing(self, enable: bool) -> None:
|
||||
"""
|
||||
Enable or disable image writing to file through the HDF5 plugin.
|
||||
|
||||
Args:
|
||||
enable (bool): Whether to enable image writing.
|
||||
"""
|
||||
self.hdf.enable.set(1 if enable else 0).wait(timeout=self._pv_timeout)
|
||||
|
||||
@property
|
||||
def enable_xes(self) -> bool:
|
||||
"""Get whether XES data acquisition is enabled."""
|
||||
@@ -610,6 +623,13 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
self.cam.tdc2_edge.set(TDCEdge.RISING).wait(timeout=self._pv_timeout)
|
||||
self.cam.tdc2_output.set(TDCOuput.ALL_CHANNELS).wait(timeout=self._pv_timeout)
|
||||
|
||||
def wait_for_connection(
|
||||
self, all_signals: bool = False, timeout: float | None = None, **kwargs
|
||||
):
|
||||
super().wait_for_connection(all_signals, timeout)
|
||||
# Prepare backend for TimePixFly
|
||||
self.backend.on_connected()
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
@@ -639,8 +659,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
# Prepare TimePixFly backend
|
||||
# -----------------
|
||||
|
||||
# Prepare backend for TimePixFly
|
||||
self.backend.on_connected()
|
||||
# Register the callback for processing data received by the backend
|
||||
self.backend.add_callback(self.msg_buffer_callback)
|
||||
self._poll_thread.start()
|
||||
@@ -693,18 +711,18 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
file_path = "/".join(self._full_path.split("/")[:-1])
|
||||
file_name = self._full_path.split("/")[-1]
|
||||
self.cam.array_callbacks.set(1).wait(5) # Enable array callbacks
|
||||
# self.hdf.enable.set(1).wait(5) # Enable HDF5 plugin
|
||||
self.hdf.file_path.set(file_path).wait(5)
|
||||
self.hdf.file_name.set(file_name).wait(5)
|
||||
# Setup file writing for the total expected number of images
|
||||
self.hdf.num_capture.set(self._n_images).wait(5)
|
||||
self.hdf.capture.put(1)
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=False,
|
||||
successful=False,
|
||||
hinted_h5_entries={"data": "/entry/data/data"},
|
||||
)
|
||||
if self.hdf.enable.get() == 1:
|
||||
self.hdf.capture.put(1)
|
||||
self.file_event.put(
|
||||
file_path=self._full_path,
|
||||
done=False,
|
||||
successful=False,
|
||||
hinted_h5_entries={"data": "/entry/data/data"},
|
||||
)
|
||||
|
||||
# -------------------------
|
||||
# XES specific staging
|
||||
@@ -799,12 +817,15 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
# Status Camera
|
||||
status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
return_status = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
# Status Writer
|
||||
st1 = CompareStatus(self.hdf.capture, ACQUIRESTATUS.DONE)
|
||||
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
|
||||
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
|
||||
status_writer = st1 & st2 & status_written_images
|
||||
status_writer = None
|
||||
if self.hdf.enable.get() == 1:
|
||||
st2 = CompareStatus(self.hdf.write_file, ACQUIRESTATUS.DONE)
|
||||
st3 = ExceptionStatus(self.hdf.write_status, 0, operation="!=")
|
||||
status_written_images = CompareStatus(self.hdf.num_captured, self._n_images)
|
||||
status_writer = st1 & st2 & status_written_images & st3
|
||||
|
||||
# Status Backend
|
||||
status_backend = None
|
||||
@@ -813,9 +834,9 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
status_backend = self.backend.on_complete(status=status_backend)
|
||||
# Combine the statuses
|
||||
if status_backend is not None:
|
||||
return_status = status_backend & status_camera & status_writer
|
||||
else:
|
||||
return_status = status_camera & status_writer
|
||||
return_status = status_backend & return_status
|
||||
if status_writer is not None:
|
||||
return_status = return_status & status_writer
|
||||
|
||||
return_status.add_callback(self._complete_callback)
|
||||
self.cancel_on_stop(return_status)
|
||||
@@ -823,6 +844,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
|
||||
def _complete_callback(self, status: CompareStatus) -> None:
|
||||
"""Callback for when the device completes a scan."""
|
||||
if self.hdf.enable.get() != 1: # TODO: Not sure if we should support disabled file writing.
|
||||
return
|
||||
if status.success:
|
||||
self.file_event.put(
|
||||
file_path=self._full_path, # pylint: disable:protected-access
|
||||
@@ -849,11 +872,14 @@ class Timepix(PSIDeviceBase, TimePixControl):
|
||||
|
||||
def on_destroy(self):
|
||||
"""Cleanup method to stop the device and clean up resources."""
|
||||
self.cam.acquire.put(0)
|
||||
self.hdf.capture.put(0)
|
||||
self._poll_thread_kill_event.set()
|
||||
self.backend.on_stop()
|
||||
self.backend.on_destroy()
|
||||
try:
|
||||
self.cam.acquire.put(0)
|
||||
self.hdf.capture.put(0)
|
||||
self._poll_thread_kill_event.set()
|
||||
self.backend.on_stop()
|
||||
self.backend.on_destroy()
|
||||
except Exception:
|
||||
logger.warning(f"Failed to destroy {self.name}.")
|
||||
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
||||
@@ -12,7 +12,6 @@ hooks for all the relevant ophyd interface, 'on_stage',
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import signal
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
@@ -81,14 +80,14 @@ class TimepixFlyBackend:
|
||||
###### Hooks for the PSIDeviceBase interface ######
|
||||
###################################################
|
||||
|
||||
def on_connected(self):
|
||||
def on_connected(self, timeout: float = 10):
|
||||
"""Called if it is ensured that the device is connected."""
|
||||
time_started = time.time()
|
||||
logger.info("Connecting to Timepix Fly backend...")
|
||||
try:
|
||||
self.timepix_fly_client.on_connected()
|
||||
self.timepix_fly_client.on_connected(timeout=timeout / 2)
|
||||
status = self.start_data_server()
|
||||
status.wait(timeout=5)
|
||||
status.wait(timeout=timeout / 2)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error starting data server: {content}")
|
||||
@@ -239,6 +238,7 @@ class TimepixFlyBackend:
|
||||
|
||||
def on_destroy(self):
|
||||
"""Hook for on_destroy logic."""
|
||||
time_started = time.time()
|
||||
self.timepix_fly_client.shutdown()
|
||||
self._data_thread_shutdown_event.set()
|
||||
if self._data_thread is not None and self._data_thread.is_alive():
|
||||
@@ -255,6 +255,9 @@ class TimepixFlyBackend:
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error closing socket server: {content}")
|
||||
logger.info(
|
||||
f"Timepix Fly backend destroyed and resources cleaned up after {time.time() - time_started:.3f} seconds."
|
||||
)
|
||||
|
||||
def on_stop(self):
|
||||
"""Hook for on_stop logic."""
|
||||
|
||||
@@ -88,7 +88,7 @@ class TimepixFlyClient:
|
||||
### Utility Methods ###
|
||||
#############################
|
||||
|
||||
def on_connected(self) -> None:
|
||||
def on_connected(self, timeout: float = 5) -> None:
|
||||
"""
|
||||
Called when the client is connected to the TimePix server.
|
||||
This method can be overridden to perform actions when the client connects.
|
||||
@@ -96,7 +96,7 @@ class TimepixFlyClient:
|
||||
try:
|
||||
self.stop_running_collection()
|
||||
self.connect()
|
||||
self.wait_for_connection(timeout=5)
|
||||
self.wait_for_connection(timeout=timeout)
|
||||
|
||||
except Exception:
|
||||
content = traceback.format_exc()
|
||||
@@ -270,16 +270,18 @@ class TimepixFlyClient:
|
||||
continue
|
||||
if status in success:
|
||||
dev_status.set_finished()
|
||||
logger.debug(f"Status callback finished in succes: {status.value}")
|
||||
logger.debug(f"Status callback finished in success: {status.value}")
|
||||
self._status_callbacks.pop(cb_id)
|
||||
elif status in error:
|
||||
try:
|
||||
last_error = self.last_error()
|
||||
raise TimePixStatusError(
|
||||
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
|
||||
f"TimePixFly state '{status.value}': {last_error.message}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
|
||||
logger.error(
|
||||
f"Error in status callback for '{status.value}' from TimepixFly backend: {e}"
|
||||
)
|
||||
dev_status.set_exception(e)
|
||||
self._status_callbacks.pop(cb_id)
|
||||
# Reset the _started flag if the status is in CONFIG.
|
||||
|
||||
@@ -0,0 +1,355 @@
|
||||
"""Unit tests for the Timepix device with a mocked backend."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
|
||||
from superxas_bec.devices.timepix.timepix import (
|
||||
ACQUIRESTATUS,
|
||||
DATASOURCE,
|
||||
EXPOSUREMODE,
|
||||
FILEWRITEMODE,
|
||||
TDCEdge,
|
||||
TDCOuput,
|
||||
TRIGGERMODE,
|
||||
TRIGGERSOURCE,
|
||||
Timepix,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
NetAddresses,
|
||||
PixelMap,
|
||||
)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
class FakeBackendClient:
|
||||
"""Backend client double that can finish status callbacks on demand."""
|
||||
|
||||
def __init__(self):
|
||||
self.status = TimePixFlyStatus.CONFIG
|
||||
self._status_callbacks = {}
|
||||
|
||||
def add_status_callback(self, status, success, error, run=True):
|
||||
"""Register status callback with optional immediate completion."""
|
||||
if run and self.status in success:
|
||||
status.set_finished()
|
||||
return
|
||||
self._status_callbacks[id(status)] = (status, success, error)
|
||||
|
||||
def emit_status(self, status_value: TimePixFlyStatus):
|
||||
"""Resolve tracked status objects with a simulated backend state."""
|
||||
self.status = status_value
|
||||
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
|
||||
with status._lock:
|
||||
if status.done:
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
continue
|
||||
if status_value in success:
|
||||
status.set_finished()
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
elif status_value in error:
|
||||
status.set_exception(RuntimeError(f"backend entered {status_value.value}"))
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
|
||||
def get_net_addresses(self):
|
||||
"""Return a deterministic backend raw-data address."""
|
||||
return NetAddresses(
|
||||
control="127.0.0.1:8452",
|
||||
address="127.0.0.1:8451",
|
||||
server="127.0.0.1:8080",
|
||||
)
|
||||
|
||||
|
||||
class FakeBackend:
|
||||
"""Minimal backend double used to isolate Timepix from backend integration."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.hostname = kwargs.get("hostname") or "localhost"
|
||||
self.socket_port = kwargs.get("socket_port", 9876)
|
||||
self.timepix_fly_client = FakeBackendClient()
|
||||
self.on_connected = mock.Mock()
|
||||
self.on_stage = mock.Mock()
|
||||
self.on_stop = mock.Mock()
|
||||
self.on_destroy = mock.Mock()
|
||||
self.add_callback = mock.Mock()
|
||||
self._trigger_status = StatusBase()
|
||||
self._trigger_status.set_finished()
|
||||
self._complete_status = StatusBase()
|
||||
self._complete_status.set_finished()
|
||||
|
||||
def on_trigger(self):
|
||||
"""Return a backend-prepared trigger status."""
|
||||
return self._trigger_status
|
||||
|
||||
def on_trigger_finished(self):
|
||||
"""Return the status that resolves when acquisition is complete."""
|
||||
return self._complete_status
|
||||
|
||||
def on_complete(self, status=None):
|
||||
"""Return a backend completion status."""
|
||||
if status is None:
|
||||
return self._complete_status
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
def _finished_status(device=None):
|
||||
"""Create a finished status for mocked signal set operations."""
|
||||
status = DeviceStatus(device=device)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
def _force_signal_value(signal, value):
|
||||
"""Set a mocked PV-backed signal value, including read-only EPICS signals."""
|
||||
if hasattr(signal, "_read_pv"):
|
||||
signal._read_pv.mock_data = value
|
||||
return
|
||||
signal.put(value)
|
||||
|
||||
|
||||
def _message_value(signal):
|
||||
"""Extract the signal payload from a BEC message signal."""
|
||||
msg = signal.get()
|
||||
return msg.signals[signal.name]["value"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timepix():
|
||||
"""Timepix device with mocked EPICS signals and a fully mocked backend."""
|
||||
backend = FakeBackend()
|
||||
scan_info = SimpleNamespace(
|
||||
msg=SimpleNamespace(
|
||||
scan_name="step_scan",
|
||||
scan_parameters={"exp_time": 0.1, "frames_per_trigger": 2},
|
||||
num_points=3,
|
||||
)
|
||||
)
|
||||
with (
|
||||
mock.patch.object(ophyd, "cl") as mock_cl,
|
||||
mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix.TimepixFlyBackend", return_value=backend
|
||||
),
|
||||
):
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
dev = Timepix(
|
||||
name="timepix",
|
||||
prefix="X10DA-ES-TPX1:",
|
||||
backend_rest_url="localhost:8452",
|
||||
hostname="localhost",
|
||||
socket_port=9876,
|
||||
scan_info=scan_info,
|
||||
device_manager=DMMock(),
|
||||
)
|
||||
patch_dual_pvs(dev)
|
||||
dev.backend = backend
|
||||
dev._poll_thread = mock.Mock()
|
||||
for walk in dev.walk_signals():
|
||||
signal = walk.item
|
||||
if hasattr(signal, "set") and hasattr(signal, "put"):
|
||||
signal.set = mock.Mock(side_effect=lambda value, _sig=signal, **_kw: (_force_signal_value(_sig, value), _finished_status(_sig))[1]) # type: ignore[method-assign]
|
||||
yield dev
|
||||
dev._poll_thread_kill_event.set()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pixel_map():
|
||||
"""Small valid pixel map used for stage tests."""
|
||||
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
|
||||
|
||||
|
||||
def test_timepix_on_connected_configures_signals_and_registers_callback(timepix):
|
||||
"""Connected hook should configure camera, file writer, and backend callback."""
|
||||
timepix.on_connected()
|
||||
|
||||
assert timepix.cam.tdc1_enable.get() == 1
|
||||
assert timepix.cam.tdc2_enable.get() == 1
|
||||
assert timepix.cam.raw_enable.get() == 1
|
||||
assert timepix.cam.tdc1_edge.get() == TDCEdge.RISING
|
||||
assert timepix.cam.tdc1_output.get() == TDCOuput.ALL_CHANNELS
|
||||
assert timepix.cam.trigger_mode.get() == TRIGGERMODE.INTERNAL
|
||||
assert timepix.cam.trigger_source.get() == TRIGGERSOURCE.HDMI1_1
|
||||
assert timepix.cam.exposure_mode.get() == EXPOSUREMODE.TIMED
|
||||
assert timepix.cam.array_counter.get() == 0
|
||||
assert timepix.hdf.enable.get() == "1"
|
||||
assert timepix.hdf.file_write_mode.get() == FILEWRITEMODE.STREAM.value
|
||||
assert timepix.hdf.auto_save.get() == 1
|
||||
assert timepix.cam.array_callbacks.get() == 1
|
||||
timepix.backend.add_callback.assert_called_once_with(timepix.msg_buffer_callback)
|
||||
timepix._poll_thread.start.assert_called_once()
|
||||
|
||||
|
||||
def test_timepix_on_stage_configures_camera_writer_and_mocked_backend(timepix, pixel_map):
|
||||
"""Stage should configure camera settings and forward config to the backend."""
|
||||
timepix._pixel_map = pixel_map
|
||||
|
||||
with (
|
||||
mock.patch.object(timepix.hdf.enable, "get", return_value=1),
|
||||
mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix.get_full_path",
|
||||
return_value="/tmp/timepix/test_scan.h5",
|
||||
),
|
||||
):
|
||||
timepix.on_stage()
|
||||
|
||||
assert timepix.cam.acquire_time.get() == pytest.approx(0.1 - timepix._readout_time)
|
||||
assert timepix.cam.acquire_period.get() == pytest.approx(0.1)
|
||||
assert timepix.cam.num_images.get() == 2
|
||||
assert timepix.cam.data_source.get() == DATASOURCE.IMAGE
|
||||
assert timepix.hdf.file_path.get() == "/tmp/timepix"
|
||||
assert timepix.hdf.file_name.get() == "test_scan.h5"
|
||||
assert timepix.hdf.num_capture.get() == 6
|
||||
assert timepix.hdf.capture.get() == 1
|
||||
assert timepix.cam.raw_file_template.get() == ""
|
||||
assert timepix.cam.raw_file_path.get() == "tcp://connect@127.0.0.1:8451"
|
||||
assert timepix.backend.on_stage.call_count == 1
|
||||
other_config = timepix.backend.on_stage.call_args.kwargs["other_config"]
|
||||
assert other_config.output_uri == "tcp:localhost:9876"
|
||||
assert other_config.TRoiStep == timepix.troistep
|
||||
assert other_config.TRoiN == timepix.troin
|
||||
assert timepix.backend.on_stage.call_args.kwargs["pixel_map"] == pixel_map
|
||||
file_event = timepix.file_event.get()
|
||||
assert file_event.file_path == "/tmp/timepix/test_scan.h5"
|
||||
assert file_event.done is False
|
||||
assert file_event.successful is False
|
||||
|
||||
|
||||
def test_timepix_on_stage_without_xes_skips_backend_configuration(timepix):
|
||||
"""When XES is disabled, the backend-specific stage call should be skipped."""
|
||||
timepix.enable_xes = False
|
||||
|
||||
with mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix.get_full_path",
|
||||
return_value="/tmp/timepix/no_xes_scan.h5",
|
||||
):
|
||||
timepix.on_stage()
|
||||
|
||||
timepix.backend.on_stage.assert_not_called()
|
||||
|
||||
|
||||
def test_timepix_on_trigger_combines_camera_and_backend_status(timepix):
|
||||
"""Trigger should arm the backend first and then drive the camera."""
|
||||
backend_done = StatusBase()
|
||||
backend_done.set_finished()
|
||||
backend_finished = StatusBase()
|
||||
timepix.backend._trigger_status = backend_done
|
||||
timepix.backend._complete_status = backend_finished
|
||||
|
||||
status = timepix.on_trigger()
|
||||
|
||||
assert isinstance(status, StatusBase)
|
||||
assert timepix.cam.acquire.get() == 1
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
backend_finished.set_finished()
|
||||
status.wait(timeout=0.1)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
|
||||
def test_timepix_on_complete_marks_file_event_success(timepix):
|
||||
"""Complete should wait for writer/backend completion and emit a success file event."""
|
||||
timepix._full_path = "/tmp/timepix/final_scan.h5"
|
||||
timepix._n_images = 3
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.ACQUIRING)
|
||||
timepix.hdf.capture.put(ACQUIRESTATUS.ACQUIRING)
|
||||
timepix.hdf.write_file.put(ACQUIRESTATUS.ACQUIRING)
|
||||
|
||||
with mock.patch.object(timepix.hdf.enable, "get", return_value=1):
|
||||
status = timepix.on_complete()
|
||||
|
||||
_force_signal_value(timepix.cam.acquire_busy, ACQUIRESTATUS.DONE)
|
||||
timepix.hdf.capture.put(ACQUIRESTATUS.DONE)
|
||||
timepix.hdf.write_file.put(ACQUIRESTATUS.DONE)
|
||||
timepix.hdf.write_status.put(0)
|
||||
_force_signal_value(timepix.hdf.num_captured, 3)
|
||||
status.wait(timeout=0.1)
|
||||
|
||||
file_event = timepix.file_event.get()
|
||||
assert file_event.file_path == "/tmp/timepix/final_scan.h5"
|
||||
assert file_event.done is True
|
||||
assert file_event.successful is True
|
||||
|
||||
|
||||
def test_timepix_msg_buffer_callback_updates_xes_signals(timepix):
|
||||
"""The backend message callback should populate all exported XES data signals."""
|
||||
start_frame = {
|
||||
"type": "StartFrame",
|
||||
"Mode": "TOA",
|
||||
"TRoiStart": 0,
|
||||
"TRoiStep": 1,
|
||||
"TRoiN": 2,
|
||||
"NumEnergyPoints": 8,
|
||||
"save_interval": 1,
|
||||
}
|
||||
data_frames = [
|
||||
{
|
||||
"type": "XesData",
|
||||
"period": 131000,
|
||||
"totalEvents": 36,
|
||||
"TDSpectra": list(range(16)),
|
||||
"beforeROI": 0,
|
||||
"afterROI": 0,
|
||||
}
|
||||
]
|
||||
end_frame = {"type": "EndFrame", "error": "", "periods": 4}
|
||||
timepix.troin = 2
|
||||
|
||||
timepix.msg_buffer_callback(start_frame, data_frames, end_frame)
|
||||
|
||||
expected_xes = np.array(
|
||||
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13, 14, 15]], dtype=np.float32
|
||||
)
|
||||
np.testing.assert_array_equal(_message_value(timepix.xes_data), expected_xes)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_spectra), np.array([28, 92], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_energy_1), np.array([6, 38], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_energy_2), np.array([22, 54], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_data_accumulated_1), np.array([6, 38], dtype=np.float32)
|
||||
)
|
||||
np.testing.assert_array_equal(
|
||||
_message_value(timepix.xes_data_accumulated_2), np.array([22, 54], dtype=np.float32)
|
||||
)
|
||||
assert _message_value(timepix.total_periods) == pytest.approx(4 / 131000)
|
||||
np.testing.assert_array_equal(_message_value(timepix.tds_period), np.array([1.0]))
|
||||
assert _message_value(timepix.total_events) == 36
|
||||
|
||||
|
||||
def test_timepix_on_stop_stops_camera_writer_and_backend(timepix):
|
||||
"""Stop should stop camera acquisition and delegate backend stop."""
|
||||
timepix.cam.acquire.put(1)
|
||||
timepix.hdf.capture.put(1)
|
||||
|
||||
timepix.on_stop()
|
||||
|
||||
assert timepix.cam.acquire.get() == 0
|
||||
assert timepix.hdf.capture.get() == 0
|
||||
timepix.backend.on_stop.assert_called_once()
|
||||
|
||||
|
||||
def test_timepix_on_destroy_cleans_up_backend(timepix):
|
||||
"""Destroy should stop polling and forward cleanup to the backend."""
|
||||
timepix.on_destroy()
|
||||
|
||||
assert timepix._poll_thread_kill_event.is_set() is True
|
||||
timepix.backend.on_stop.assert_called_once()
|
||||
timepix.backend.on_destroy.assert_called_once()
|
||||
@@ -1,15 +1,199 @@
|
||||
"""This module tests the Timepix Fly backend functionality."""
|
||||
"""Unit tests for the Timepix Fly backend."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
|
||||
import pytest
|
||||
from ophyd import StatusBase
|
||||
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import (
|
||||
TimepixFlyBackend,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
|
||||
TimePixFlyStatus,
|
||||
TimePixStatusError,
|
||||
)
|
||||
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
|
||||
NetAddresses,
|
||||
OtherConfigModel,
|
||||
PixelMap,
|
||||
)
|
||||
|
||||
|
||||
class FakeTimepixFlyClient:
|
||||
"""Minimal client double that can drive backend status callbacks."""
|
||||
|
||||
def __init__(self, rest_url: str, ws_url: str):
|
||||
self.rest_url = rest_url
|
||||
self.ws_url = ws_url
|
||||
self.status = TimePixFlyStatus.CONFIG
|
||||
self._status_callbacks = {}
|
||||
self.error_message = "boom"
|
||||
self.on_connected = mock.Mock()
|
||||
self.shutdown = mock.Mock()
|
||||
self.start = mock.Mock()
|
||||
self.stop_running_collection = mock.Mock()
|
||||
self.set_other_config = mock.Mock()
|
||||
self.set_pixel_map = mock.Mock()
|
||||
self.get_net_addresses = mock.Mock(
|
||||
return_value=NetAddresses(
|
||||
control="127.0.0.1:8452",
|
||||
address="127.0.0.1:8451",
|
||||
server="127.0.0.1:8080",
|
||||
)
|
||||
)
|
||||
|
||||
def add_status_callback(self, status, success, error, run=True):
|
||||
"""Store callbacks and optionally resolve them immediately."""
|
||||
if run:
|
||||
if self.status in success:
|
||||
status.set_finished()
|
||||
return
|
||||
if self.status in error:
|
||||
status.set_exception(
|
||||
TimePixStatusError(f"TimePixFly state '{self.status.value}': {self.error_message}")
|
||||
)
|
||||
return
|
||||
self._status_callbacks[id(status)] = (status, success, error)
|
||||
|
||||
def last_error(self):
|
||||
"""Return a lightweight error object."""
|
||||
return SimpleNamespace(message=self.error_message)
|
||||
|
||||
def emit_status(self, status_value: TimePixFlyStatus):
|
||||
"""Resolve stored status callbacks as if a websocket status update arrived."""
|
||||
self.status = status_value
|
||||
for cb_id, (status, success, error) in list(self._status_callbacks.items()):
|
||||
with status._lock:
|
||||
if status.done:
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
continue
|
||||
if status_value in success:
|
||||
status.set_finished()
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
elif status_value in error:
|
||||
status.set_exception(
|
||||
TimePixStatusError(
|
||||
f"TimePixFly state '{status_value.value}': {self.error_message}"
|
||||
)
|
||||
)
|
||||
self._status_callbacks.pop(cb_id, None)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timepix_fly_backend():
|
||||
"""Fixture for creating a Timepix Fly backend instance."""
|
||||
backend = TimepixFlyBackend(backend_rest_url="http://localhost:8000")
|
||||
yield backend
|
||||
def backend_with_states():
|
||||
"""Return a backend together with a helper that emits backend states."""
|
||||
with mock.patch(
|
||||
"superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend.TimepixFlyClient",
|
||||
FakeTimepixFlyClient,
|
||||
):
|
||||
backend = TimepixFlyBackend(backend_rest_url="localhost:8452", hostname="localhost")
|
||||
yield backend, backend.timepix_fly_client
|
||||
backend.on_destroy()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pixel_map():
|
||||
"""Small valid pixel map for backend unit tests."""
|
||||
return PixelMap(chips=[[{"i": 0, "p": 0, "f": [1.0]}]])
|
||||
|
||||
|
||||
def test_timepix_fly_backend_stage_pushes_configuration(backend_with_states, pixel_map):
|
||||
"""Stage should push both config objects to the client."""
|
||||
backend, client = backend_with_states
|
||||
other_config = OtherConfigModel(output_uri="tcp:localhost:9000", TRoiStep=2, TRoiN=16)
|
||||
|
||||
backend.on_stage(other_config=other_config, pixel_map=pixel_map)
|
||||
|
||||
client.set_other_config.assert_called_once_with(other_config)
|
||||
client.set_pixel_map.assert_called_once_with(pixel_map)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_trigger_callback_success(backend_with_states):
|
||||
"""Trigger status should resolve once the backend reports await_connection."""
|
||||
backend, state_driver = backend_with_states
|
||||
|
||||
status = backend.on_trigger()
|
||||
assert status.done is False
|
||||
|
||||
state_driver.emit_status(TimePixFlyStatus.AWAIT_CONNECTION)
|
||||
|
||||
status.wait(timeout=0.1)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
|
||||
def test_timepix_fly_backend_trigger_callback_error(backend_with_states):
|
||||
"""Trigger status should fail when the backend reports an exception state."""
|
||||
backend, state_driver = backend_with_states
|
||||
|
||||
status = backend.on_trigger()
|
||||
state_driver.error_message = "failed to configure"
|
||||
state_driver.emit_status(TimePixFlyStatus.EXCEPT)
|
||||
|
||||
with pytest.raises(TimePixStatusError, match="failed to configure"):
|
||||
status.wait(timeout=0.1)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_complete_callback_success(backend_with_states):
|
||||
"""Complete status should resolve when the backend goes back to config."""
|
||||
backend, state_driver = backend_with_states
|
||||
|
||||
status = backend.on_complete()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
state_driver.emit_status(TimePixFlyStatus.CONFIG)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_stop_cancels_tracked_statuses(backend_with_states):
|
||||
"""Stopping the backend should fail all tracked statuses and stop collection."""
|
||||
backend, client = backend_with_states
|
||||
status = StatusBase()
|
||||
backend.cancel_on_stop(status)
|
||||
|
||||
backend.on_stop()
|
||||
|
||||
client.stop_running_collection.assert_called_once()
|
||||
with pytest.raises(RuntimeError, match="Stop called on device"):
|
||||
status.wait(timeout=0.1)
|
||||
|
||||
|
||||
def test_timepix_fly_backend_add_and_remove_callback(backend_with_states):
|
||||
"""Callbacks can be registered and removed by id."""
|
||||
backend, _ = backend_with_states
|
||||
|
||||
cb_id = backend.add_callback(lambda *_args, **_kwargs: None, kwd={"scan_id": 5})
|
||||
|
||||
stored_cb_id = next(iter(backend.callbacks))
|
||||
assert str(stored_cb_id) == cb_id
|
||||
backend.remove_callback(stored_cb_id)
|
||||
assert stored_cb_id not in backend.callbacks
|
||||
|
||||
|
||||
def test_timepix_fly_backend_decode_end_frame_runs_callbacks(backend_with_states):
|
||||
"""The buffered frame callback should be invoked only once EndFrame arrives."""
|
||||
backend, _ = backend_with_states
|
||||
received = {}
|
||||
|
||||
def callback(start_frame, data_frames, end_frame, scan_id):
|
||||
received["start_frame"] = start_frame
|
||||
received["data_frames"] = data_frames
|
||||
received["end_frame"] = end_frame
|
||||
received["scan_id"] = scan_id
|
||||
|
||||
backend.add_callback(callback, kwd={"scan_id": 7})
|
||||
backend._decode_received_data(
|
||||
'{"type":"StartFrame","Mode":"TOA","TRoiStart":0,"TRoiStep":1,"TRoiN":2,"NumEnergyPoints":2,"save_interval":10}'
|
||||
)
|
||||
backend._decode_received_data(
|
||||
'{"type":"XesData","period":1,"TDSpectra":[1,2,3,4],"totalEvents":4,"beforeROI":0,"afterROI":0}'
|
||||
)
|
||||
backend._decode_received_data('{"type":"EndFrame","error":"","periods":5}')
|
||||
|
||||
assert received["start_frame"]["type"] == "StartFrame"
|
||||
assert received["data_frames"][0]["type"] == "XesData"
|
||||
assert received["end_frame"]["type"] == "EndFrame"
|
||||
assert received["scan_id"] == 7
|
||||
assert backend._TimepixFlyBackend__msg_buffer == []
|
||||
|
||||
Reference in New Issue
Block a user