fix: Cleanup
All checks were successful
CI for superxas_bec / test (push) Successful in 33s
CI for superxas_bec / test (pull_request) Successful in 36s

This commit is contained in:
2025-11-30 14:06:17 +01:00
parent f8929623b0
commit 4bdce94e36
3 changed files with 208 additions and 134 deletions

View File

@@ -8,6 +8,7 @@ interface in EPICS, which is implemented via the 'ASItpxCam' class.
from __future__ import annotations
import enum
import os
import threading
import time
import traceback
@@ -17,13 +18,20 @@ import numpy as np
from bec_lib.logger import bec_logger
from ophyd import ADBase
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, StatusBase
from ophyd_devices import AsyncSignal, CompareStatus, PreviewSignal, TransitionStatus
from ophyd_devices import (
AsyncSignal,
CompareStatus,
DeviceStatus,
PreviewSignal,
StatusBase,
TransitionStatus,
)
from ophyd_devices.devices.areadetector.cam import ASItpxCam
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35, ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
import superxas_bec.devices.timepix.default_pixel_maps as _default_pixel_maps
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_backend import TimepixFlyBackend
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import TimePixFlyStatus
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface import (
@@ -116,6 +124,27 @@ class DATASOURCE(int, enum.Enum):
IMAGE = 2
def load_pixel_map_from_json(file_path: str) -> PixelMap:
"""Load a pixel map from a JSON file.
Args:
file_path (str): Path to the JSON file containing the pixel map.
Returns:
PixelMap: The loaded pixel map.
"""
# Check if path exists
if not os.path.exists(file_path):
raise FileNotFoundError(f"Pixel map file not found: {file_path}")
try:
with open(file_path, "r", encoding="utf-8") as file:
pixel_map_str = file.read()
pixel_map = PixelMap.model_validate_json(pixel_map_str)
except Exception as exc:
raise ValueError(f"Failed to load pixel map from {file_path}: {exc}") from exc
return pixel_map
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-locals
class TimePixControl(ADBase):
"""Interface for the TimePix EPICS control of the TimePix detector."""
@@ -123,10 +152,11 @@ class TimePixControl(ADBase):
cam = Cpt(ASItpxCam, "cam1:")
image = Cpt(ImagePlugin_V35, "image1:")
hdf = Cpt(HDF5Plugin_V35, "HDF1:")
# latest hdf5 plugin
# latest image plugin
DEFAULT_PIXEL_MAP = os.path.join(
os.path.dirname(_default_pixel_maps.__file__), "timepix_8_chips_single_energy_per_chip.json"
)
class Timepix(PSIDeviceBase, TimePixControl):
@@ -137,22 +167,55 @@ class Timepix(PSIDeviceBase, TimePixControl):
The hostname needs to be set to the name of this machine, e.h. x10da-bec-001.psi.ch.
"""
_DETECTOR_SHAPE = (512, 512) # Shape of the TimePix detector
USER_ACCESS = ["troin", "troistep", "get_pixel_map", "set_pixel_map"]
MIN_DETECTOR_READOUT_TIME = 2.1e-3 # Minimum readout time in seconds for ASI TimePix detector
# TODO Check names with beamline team, async signals current receive a nested data name structure
xes_data = Cpt(AsyncSignal, name="xes_data", ndim=2, max_size=1000)
xes_spectra = Cpt(AsyncSignal, name="xes_spectra", ndim=1, max_size=1000)
tds_period = Cpt(AsyncSignal, name="tds_period", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000)
tds_total_events = Cpt(AsyncSignal, name="tds_total_events", ndim=0, async_update={"type": "add", "max_shape": [None]}, max_size=1000)
# # Here, we currently inherit a nested name structure --> xes_info_tds_period, xes_info_tds_total_events
# TODO adapt detector shape here
_DETECTOR_SHAPE = (512, 512) # Shape of the TimePix detector
USER_ACCESS = [
"troin",
"troistep",
"get_pixel_map",
"set_pixel_map",
"set_pixel_map_from_json_file",
]
xes_data = Cpt(
AsyncSignal,
name="xes_data",
ndim=2,
max_size=1000,
doc="Full XES data, 2D image with energypoints vs time bins.",
)
xes_spectra = Cpt(
AsyncSignal,
name="xes_spectra",
ndim=1,
max_size=1000,
doc="Static 1D spectra, integrated over time bins.",
)
tds_period = Cpt(
AsyncSignal,
name="tds_period",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="TDS period recorded by the TimePixFly backend detector.",
)
tds_total_events = Cpt(
AsyncSignal,
name="tds_total_events",
ndim=0,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
doc="Total events recorded by the TimePixFly backend detector.",
)
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0, # Check this
doc="Preview signal for the Pilatus Detector",
num_rotation_90=0, # TODO check the orientation
doc="Preview signal of the TimePix detector.",
)
def __init__(
@@ -182,23 +245,11 @@ class Timepix(PSIDeviceBase, TimePixControl):
device_manager: Device manager instance, if available.
**kwargs: Additional keyword arguments for the base class.
"""
self.backend = TimepixFlyBackend(
backend_rest_url=backend_rest_url, hostname=hostname, socket_port=socket_port
)
self._pixel_map = PixelMap(
chips=[
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
]
)
self._n_energy_points = 3
self._pixel_map = None
self._troistep = 1
self._troin = 5000
super().__init__(
@@ -208,14 +259,17 @@ class Timepix(PSIDeviceBase, TimePixControl):
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_rate = 1 # Poll rate in Hz
self._poll_rate = (
5 # Image poll rate for preview updates in Hz (max 5 Hz to limit throughput)
)
self._unique_array_id = 0
self._pv_timeout = 5
self._readout_time = 2.1e-3 # 2.1ms readout time to ensure readout is >2ms, required from ASI serval server..
self._readout_time = self.MIN_DETECTOR_READOUT_TIME
self.r_lock = threading.RLock() # Lock to access the message buffer safely
def stage(self) -> list[object] | DeviceStatus | StatusBase: # type: ignore
def stage(self) -> list[object] | StatusBase: # type: ignore
"""Stage the device.
Super stage not safe to call.."""
self.stopped = False
status = self.on_stage() # pylint: disable=assignment-from-no-return
@@ -227,25 +281,19 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
try:
# logger.info(f"Running poll loop for {self.name}..")
# First check if there is a new image
if self.image.unique_id.get() == self._unique_array_id:
continue
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
width = self.image.array_size.width.get()
height = self.image.array_size.height.get()
# Geometry correction for the image
data = np.reshape(value, (height, width))
last_image: DevicePreviewMessage = self.preview.get()
# logger.info(f"Preview image for {self.name} has shape {data.shape}")
if last_image is not None:
if np.array_equal(data, last_image.data):
# No update if image is the same, ~2.5ms on 2400x2400 image (6M)
logger.debug(
f"Pilatus preview image for {self.name} is the same as last one, not updating."
)
continue
logger.debug(f"Setting preview datsa for {self.name}")
logger.debug(f"Setting preview data for {self.name} with shape {data.shape}")
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -283,16 +331,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
described in model .timepix_fly_client.timepix_fly_interface.TimepixEndFrame
"""
n_energy_points = start_frame.get("NumEnergyPoints", None)
# if n_energy_points is None:
# logger.error(
# f"NumEnergyPoints not found in start_frame: {start_frame}. Have we received the correct frame?"
# )
# return
# # TODO What should we do here if n_energy_points and troin do not match with the expected values?
if n_energy_points != self._n_energy_points:
logger.error(
f"Number of energy points {n_energy_points} does not match expected {self._n_energy_points}."
)
troin = start_frame["TRoiN"]
if troin != self._troin:
logger.error(f"Number of pixels {troin} does not match expected {self._troin}.")
@@ -305,6 +343,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
logger.error(
f"No data frames received in msg_buffer; for start_frame: {start_frame}, end_frame: {end_frame}"
)
# TODO this should no longer happen now as this was fixed in the backend..
else:
for msg in data_frames:
tds_period += msg["period"]
@@ -314,8 +353,12 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Put XES data
self.tds_period.put(tds_period)
self.tds_total_events.put(tds_total_events)
self.xes_data.put(xes_data, async_update={"type": "add", "max_shape": [None, n_energy_points, troin]})
self.xes_spectra.put(xes_data.sum(axis=1), async_update={"type": "add", "max_shape": [None, troin]})
self.xes_data.put(
xes_data, async_update={"type": "add", "max_shape": [None, n_energy_points, troin]}
)
self.xes_spectra.put(
xes_data.sum(axis=1), async_update={"type": "add", "max_shape": [None, troin]}
) # TODO check if this is the sum of time bins or energy points.. adapt..
logger.debug(f"Device data set for Timepix with {tds_period}, {tds_total_events}")
### User ACCESS methods
@@ -328,21 +371,36 @@ class Timepix(PSIDeviceBase, TimePixControl):
"""Set the pixel map from a dictionary."""
self._pixel_map = PixelMap.model_validate(pixel_map)
@property
def n_energy_points(self) -> int:
"""Energy points for the TimePix detector."""
return self._n_energy_points
def set_pixel_map_from_json_file(self, file_path: str) -> None:
"""Set the pixel map from a JSON file.
Args:
file_path (str): Path to the JSON file containing the pixel map.
"""
pixel_map = load_pixel_map_from_json(file_path)
self._pixel_map = pixel_map
@property
def pixel_map(self) -> PixelMap:
"""Get the current pixel map of the TimePix detector."""
if self._pixel_map is None:
try:
pixel_map = load_pixel_map_from_json(DEFAULT_PIXEL_MAP)
self._pixel_map = pixel_map
# pylint: disable=broad-except
# pylint: disable=raise-missing-from
except Exception:
content = traceback.format_exc()
logger.error(f"Failed to load default pixel map: {content}")
raise ValueError(
f"Failed to load default pixel map from {DEFAULT_PIXEL_MAP}: {content}"
)
return self._pixel_map
@pixel_map.setter
@typechecked
def pixel_map(self, value: PixelMap):
self._pixel_map = value
# TODO set energy points based on pixel map...
@property
def troistep(self) -> int:
@@ -381,6 +439,7 @@ class Timepix(PSIDeviceBase, TimePixControl):
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
self.set_pixel_map_from_json_file(DEFAULT_PIXEL_MAP)
self.backend.on_init()
def on_connected(self) -> None:
@@ -396,6 +455,9 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cam.trigger_mode.set(TRIGGERMODE.INTERNAL).wait(timeout=self._pv_timeout)
self.cam.trigger_source.set(TRIGGERSOURCE.HDMI1_1).wait(timeout=self._pv_timeout)
self.cam.exposure_mode.set(EXPOSUREMODE.TIMED).wait(timeout=self._pv_timeout)
self.cam.array_counter.set(0).wait(
timeout=self._pv_timeout
) # Reset array counter on connect
# self.image.unique_id.set(1).wait(timeout=self._pv_timeout)
# Prepare backend for TimePixFly
@@ -403,8 +465,8 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Register the callback for processing data received by the backend
# TODO add initial callback again once issues are resolved
self.backend.add_callback(self.msg_buffer_callback)
self._msg_dump = []
# TODO remove debug code
# self._msg_dump = []
# def _on_msg_received(start_frame, data_frame, end_frame):
# """Callback"""
# self._msg_dump.append(
@@ -412,13 +474,10 @@ class Timepix(PSIDeviceBase, TimePixControl):
# )
# self.backend.add_callback(_on_msg_received)
self._poll_thread.start()
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_stage(self) -> StatusBase | None:
"""Called while staging the device."""
exp_time = self.scan_info.msg.scan_parameters.get("exp_time", 0)
if exp_time - self._readout_time <= 0:
raise ValueError(
@@ -454,7 +513,10 @@ class Timepix(PSIDeviceBase, TimePixControl):
def on_unstage(self) -> None:
"""Called while unstaging the device."""
self.backend.on_unstage()
# TODO what should happen for unstage? Make sure that acquisition is not running?
# self.backend.on_unstage()
# self.cam.acquire.put(0)
# status_camera = CompareStatus(self.cam.acquire_busy, ACQUIRESTATUS.DONE)
def on_pre_scan(self) -> StatusBase:
"""Called right before the scan starts on all devices automatically."""
@@ -502,27 +564,10 @@ class Timepix(PSIDeviceBase, TimePixControl):
# Backend ready for connection, now start camera
status = self.backend.on_trigger_finished()
self.cancel_on_stop(status)
return_status = status_camera & status
return_status = status_camera and status
self.cam.acquire.put(1)
return return_status
# # Add Collect callback
# self.backend.timepix_fly_client.add_status_callback(
# status_backend_collect_started,
# success=[TimePixFlyStatus.COLLECT],
# error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
# )
# # Start on trigger on backend
# status_backend_on_trigger = self.backend.on_trigger(status=status_backend_on_trigger)
# status = AndStatusWithList(
# status_list=[status_camera, status_backend_on_trigger, status_backend_collect_started],
# device=self,
# )
# self.cancel_on_stop(status)
# return status
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
# Detector Control
@@ -538,9 +583,6 @@ class Timepix(PSIDeviceBase, TimePixControl):
self.cancel_on_stop(complete_status)
return complete_status
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
# Camera
@@ -603,13 +645,14 @@ if __name__ == "__main__": # pragma: no cover
logger.warning("Breaking loop manually after 20 seconds of waiting.")
status.set_exception(f"Failed to complete trigger after 20 seconds")
break
n_messages = len(timepix._msg_dump)
logger.warning(f"Messages in Buffer is {n_messages}")
if n_messages > 0:
msg = timepix._msg_dump[-1]
logger.warning(
f"Last message had N start_frame : {msg.get('start_frame')}, N data_frames: {len(msg.get('data_frame'))}, N end_frame : {msg.get('end_frame')}"
)
if hasattr(timepix, "_msg_dump"):
n_messages = len(timepix._msg_dump)
logger.warning(f"Messages in Buffer is {n_messages}")
if n_messages > 0:
msg = timepix._msg_dump[-1]
logger.warning(
f"Last message had N start_frame : {msg.get('start_frame')}, N data_frames: {len(msg.get('data_frame'))}, N end_frame : {msg.get('end_frame')}"
)
status = timepix.complete()
print("Waiting for timepix to complete.")
status.wait(timeout=10)

View File

@@ -1,11 +1,13 @@
"""Implementation of the Timepix Fly Backend. It handles the communication
"""
Implementation of the Timepix Fly Backend. It handles the communication
with the TimepixFly backend (https://github.com/paulscherrerinstitute/TimePixFly).
Please be aware that this was developed agains the 'dev' branch (2025/08/15).
It communicates with the backend through a simple Client (TimepixFlyClient)
that handles the REST and WebSocket communication + callbacks, and provides
hooks for all the relevant ophyd interface, 'on_stage',
'on_trigger', 'on_complete', 'on_stop', etc."""
'on_trigger', 'on_complete', 'on_stop', ...
"""
from __future__ import annotations
@@ -20,7 +22,7 @@ import uuid
from typing import TYPE_CHECKING, Callable, Tuple
from bec_lib.logger import bec_logger
from ophyd import StatusBase
from ophyd_devices import StatusBase
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_client import (
TimepixFlyClient,
@@ -75,13 +77,15 @@ class TimepixFlyBackend:
self._socket_server: socket.socket | None = None
self._data_thread: threading.Thread | None = None
self._data_thread_shutdown_event = threading.Event()
# TODO remove custom atexit handler if ophyd device cleanup works properly
atexit.register(self.on_destroy) # Ensure cleanup on exit
self.on_init() # TODO is this needed after registering the atexit handler?
self.on_init()
###################################################
###### Hooks for the PSIDeviceBase interface ######
###################################################
# TODO remove
def on_init(self):
"""Called during initialization of the device."""
try:
@@ -92,21 +96,23 @@ class TimepixFlyBackend:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
except Exception: # pylint
# pylint: disable=broad-except
except Exception:
logger.warning("Could not set signal handlers for SIGINT and SIGTERM.")
def on_connected(self):
"""Called if it is ensured that the device is connected."""
self.timepix_fly_client.on_connected()
try:
self.timepix_fly_client.on_connected()
status = self.start_data_server()
status.wait(timeout=5)
except Exception:
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error starting data server: {content}")
# pylint: disable=raise-missing-from
raise TimepixFlyBackendException(
f"Could not start data server on {self.hostname}:{self.socket_port}. Please check log for detailed error message."
f"Could not start data server on {self.hostname}:{self.socket_port}. Please check logs for more details."
)
def on_stage(self, other_config: OtherConfigModel, pixel_map: PixelMap):
@@ -117,6 +123,7 @@ class TimepixFlyBackend:
other_config (OtherConfigModel): The configuration for the Timepix Fly detector.
pixel_map (PixelMap): The pixel map for the Timepix Fly detector.
"""
time_started = time.time()
status = StatusBase()
self.cancel_on_stop(status)
self.timepix_fly_client.add_status_callback(
@@ -126,14 +133,15 @@ class TimepixFlyBackend:
)
try:
status.wait(timeout=5.0)
except Exception:
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while waiting for Timepix Fly backend to be in config state: {content}"
)
# pylint: disable=raise-missing-from
raise TimeoutError(
"Timepix Fly backend state did not reach config state. Most likely a timeout error. Please check log for detailed error message."
f"Timepix Fly backend state did not reach config state, running into timeout. Error traceback {content}."
)
status = StatusBase()
@@ -142,12 +150,22 @@ class TimepixFlyBackend:
success=[TimePixFlyStatus.CONFIG],
error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
)
logger.info(f"Setting other config, backend {other_config}")
logger.debug(f"Setting other config, backend {other_config}")
self.timepix_fly_client.set_other_config(other_config)
self.timepix_fly_client.set_pixel_map(pixel_map)
try: # TODO make asynchronous
status.wait(timeout=5.0)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while waiting for Timepix Fly backend to be in config state after setting config: {content}"
)
# pylint: disable=raise-missing-from
def on_pre_scan(self) -> None:
"""Hook for on pre_scan logic."""
raise TimeoutError(
f"Timepix Fly backend state did not reach config state after setting config, running into timeout. Error traceback {content}."
)
logger.info(f"TimePixFly backend staged after {time.time() - time_started:.3f} seconds.")
def on_trigger(
self, status: StatusBase | DeviceStatus | None = None
@@ -223,8 +241,16 @@ class TimepixFlyBackend:
)
return status
def on_unstage(self):
def on_unstage(self) -> StatusBase:
"""Hook for on_unstage logic."""
# status = StatusBase()
# self.cancel_on_stop(status)
# self.timepix_fly_client.add_status_callback(
# status,
# success=[TimePixFlyStatus.CONFIG],
# error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN],
# )
# return status
def on_destroy(self):
"""Hook for on_destroy logic."""
@@ -445,7 +471,7 @@ class TimepixFlyBackend:
if obj.get("type", "") == "EndFrame":
try:
# If the EndFrame message is received, run the callbacks
logger.info(f"Running callbacks")
logger.debug(f"Running callbacks")
self.run_msg_callbacks()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -467,9 +493,12 @@ class TimepixFlyBackend:
start_frame = self.__msg_buffer[0]
end_frame = self.__msg_buffer[-1]
data_frames = self.__msg_buffer[1:-1]
logger.info(f"Number of callbacks {len(self.callbacks.keys())}")
for cb, kwd in self.callbacks.values():
cb(start_frame, data_frames, end_frame, **kwd)
try:
cb(start_frame, data_frames, end_frame, **kwd)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error in callback with error msg: {content}")
if __name__ == "__main__": # pragma: no cover
@@ -538,6 +567,7 @@ if __name__ == "__main__": # pragma: no cover
print(
f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames."
)
# pylint: disable=broad-except
except Exception as e:
logger.error(f"Error during TimepixFlyBackend operation: {e}")
finally:

View File

@@ -160,17 +160,19 @@ class TimepixFlyClient:
If False, the callback will not be run immediately.
"""
if run is True:
if self.status in success:
status.set_finished()
return
if self.status in error:
last_error = self.last_error()
status.set_exception(
TimePixStatusError(
f"TimePixFly status is '{self.status.value},' last error message: {last_error.message}"
try:
if self.status in success:
status.set_finished()
return
if self.status in error:
last_error = self.last_error()
raise TimePixStatusError(
f"Current state {self.status} of TimePixFly Backend is in list of error states: {error}. Last error: {last_error.message}"
)
)
return
except Exception as e:
logger.error(f"Error while adding status callback: {e}")
if status.done is False:
status.set_exception(e)
self._status_callbacks[id(status)] = (status, success, error)
def connect(self):
@@ -268,18 +270,17 @@ class TimepixFlyClient:
continue
if status in success:
dev_status.set_finished()
logger.info(f"Status callback finished in succes: {status.value}")
logger.debug(f"Status callback finished in succes: {status.value}")
self._status_callbacks.pop(cb_id)
elif status in error:
last_error = self.last_error()
logger.error(
f"Timepix status in error is {status.value}, with last error: {last_error.message}"
)
dev_status.set_exception(
TimePixStatusError(
f"TimePixStatus status is '{status.value},' last error message: {last_error.message}"
try:
last_error = self.last_error()
raise TimePixStatusError(
f"TimePixFly Backend state '{status.value}' is in list of specified errors {error}. Last error message: {last_error.message}"
)
)
except Exception as e:
logger.error(f"Error in status callback from TimepixFly Backend: {e}")
dev_status.set_exception(e)
self._status_callbacks.pop(cb_id)
# Reset the _started flag if the status is in CONFIG.
if status == TimePixFlyStatus.CONFIG: