diff --git a/superxas_bec/devices/timepix/timepix.py b/superxas_bec/devices/timepix/timepix.py index 991e444..6575688 100644 --- a/superxas_bec/devices/timepix/timepix.py +++ b/superxas_bec/devices/timepix/timepix.py @@ -11,14 +11,13 @@ import enum import threading import time import traceback -from typing import TYPE_CHECKING, Any, Literal +from typing import Any, Literal import numpy as np from bec_lib.logger import bec_logger from ophyd import ADBase from ophyd import Component as Cpt -from ophyd import Device, DeviceStatus, Kind, StatusBase -from ophyd_devices import AndStatus as _AndStatus +from ophyd import DeviceStatus, Kind, StatusBase from ophyd_devices import AsyncSignal, CompareStatus, TransitionStatus from ophyd_devices.devices.areadetector.cam import ASItpxCam from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase @@ -30,140 +29,7 @@ from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_interface impor OtherConfigModel, PixelMap, ) - - -class AndStatus(StatusBase): - """Custom AndStatus for TimePix detector.""" - - def __init__( - self, - left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None, - name: str | Device | None = None, - right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None, - **kwargs, - ): - self.left = left if isinstance(left, list) else [left] - if right is not None: - self.right = right if isinstance(right, list) else [right] - else: - self.right = [] - self.all_statuses = self.left + self.right - if name is None: - name = "unname_status" - elif isinstance(name, Device): - name = name.name - else: - name = name - self.name = name - super().__init__(**kwargs) - self._trace_attributes["left"] = [st._trace_attributes for st in self.left] - self._trace_attributes["right"] = [st._trace_attributes for st in self.right] - - def inner(status): - with self._lock: - if self._externally_initiated_completion: - return - if self.done: # Return if status is already done.. It must be resolved already - return - - for st in self.all_statuses: - with st._lock: - if st.done and not st.success: - self.set_exception(st.exception()) # st._exception - return - - if all(st.done for st in self.all_statuses) and all( - st.success for st in self.all_statuses - ): - self.set_finished() - - for st in self.all_statuses: - with st._lock: - st.add_callback(inner) - - def __repr__(self): - return "({self.left!r} & {self.right!r})".format(self=self) - - def __str__(self): - return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self) - - def __contains__(self, status: StatusBase) -> bool: - for child in [self.left, self.right]: - if child == status: - return True - if isinstance(child, AndStatus): - if status in child: - return True - - return False - - # def set_exception(self, exc): - # with self._lock: - # super().set_exception(exc) - # for st in self.all_statuses: - # with st._lock: - # #TODO consider removing - # if not st.done: - # st.set_exception( - # RuntimeError(f"AndStatus exception on high-level status, caused by {exc}") - # ) - - def _run_callbacks(self): - """ - Set the Event and run the callbacks. - """ - if self.timeout is None: - timeout = None - else: - timeout = self.timeout + self.settle_time - if not self._settled_event.wait(timeout): - # We have timed out. It's possible that set_finished() has already - # been called but we got here before the settle_time timer expired. - # And it's possible that in this space be between the above - # statement timing out grabbing the lock just below, - # set_exception(exc) has been called. Both of these possibilties - # are accounted for. - self.log.warning("%r has timed out", self) - with self._externally_initiated_completion_lock: - # Set the exception and mark the Status as done, unless - # set_exception(exc) was called externally before we grabbed - # the lock. - if self._exception is None: - exc = TimeoutError( - f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." - ) - self._exception = exc - # Mark this as "settled". - try: - self._settled() - except Exception: - # No alternative but to log this. We can't supersede set_exception, - # and we have to continue and run the callbacks. - self.log.exception("%r encountered error during _settled()", self) - # Now we know whether or not we have succeed or failed, either by - # timeout above or by set_exception(exc), so we can set the Event that - # will mark this Status as done. - with self._lock: - self._event.set() - if self._exception is not None: - try: - self._handle_failure() - except Exception: - self.log.exception("%r encountered an error during _handle_failure()", self) - # The callbacks have access to self, from which they can distinguish - # success or failure. - for cb in self._callbacks: - try: - cb(self) - except Exception: - self.log.exception( - "An error was raised on a background thread while " - "running the callback %r(%r).", - cb, - self, - ) - self._callbacks.clear() - +from superxas_bec.devices.timepix.utils import AndStatusWithList logger = bec_logger.logger @@ -242,6 +108,8 @@ class TimePixControl(ADBase): """Interface for the TimePix EPICS control of the TimePix detector.""" cam = Cpt(ASItpxCam, "cam1:") + # latest hdf5 plugin + # latest image plugin class Timepix(PSIDeviceBase, TimePixControl): @@ -574,9 +442,9 @@ class Timepix(PSIDeviceBase, TimePixControl): # Start on trigger on backend status_backend_on_trigger = self.backend.on_trigger(status=status_backend_on_trigger) - status = AndStatus( - [status_camera, status_backend_on_trigger, status_backend_collect_started], - name=f"{self.name}_trigger_status", + status = AndStatusWithList( + status_list=[status_camera, status_backend_on_trigger, status_backend_collect_started], + device=self, ) self.cancel_on_stop(status) return status @@ -590,8 +458,8 @@ class Timepix(PSIDeviceBase, TimePixControl): # Add callback to the backend complete handling status_backend = self.backend.on_complete(status=status_backend) # Combine the statuses - complete_status = AndStatus( - [status_backend, status_detector], name=f"{self.name}_complete_status" + complete_status = AndStatusWithList( + status_list=[status_backend, status_detector], device=self ) self.cancel_on_stop(complete_status) return complete_status diff --git a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py index 67c6fa2..fd88a4e 100644 --- a/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py +++ b/superxas_bec/devices/timepix/timepix_fly_client/timepix_fly_backend.py @@ -170,6 +170,32 @@ class TimepixFlyBackend: self.timepix_fly_client.start() return status + def on_trigger_finished( + self, status: StatusBase | DeviceStatus | None = None + ) -> StatusBase | DeviceStatus: + """ + Hook for on_trigger_finished logic. It adds a status callback based on the TimePixFlyStatus. + The backend needs to get into the CONFIG state again after a trigger is finished. + In practice, a full scan logic is happening during on trigger. + The status will be marked as finished/successful when the backend state + reaches CONFIG. If an exception state is reached, the status will be marked as failed. + + Args: + status (StatusBase | DeviceStatus | None): The status object to track the operation. + If None, a new StatusBase object will be created. + Returns: + StatusBase | DeviceStatus: The status object that will be updated with the operation's result + """ + if status is None: + status = StatusBase() + self.cancel_on_stop(status) + self.timepix_fly_client.add_status_callback( + status, + success=[TimePixFlyStatus.CONFIG], + error=[TimePixFlyStatus.EXCEPT, TimePixFlyStatus.SHUTDOWN], + ) + return status + def on_complete( self, status: StatusBase | DeviceStatus | None = None ) -> StatusBase | DeviceStatus: @@ -495,10 +521,13 @@ if __name__ == "__main__": # pragma: no cover # print("TimepixFlyBackend pre-scan started.") status_1.wait(timeout=10) mock_server.start_acquisition() - # print("Acquisition started on mock server.") - status_2 = timepix.on_complete() + status_2 = timepix.on_trigger_finished() status_2.wait(timeout=10) + # print("Acquisition started on mock server.") + print("TimepixFlyBackend scan completed.") + status = timepix.on_complete() + status.wait(timeout=10) print( f"Received {len(start_frames)} start frames, {len(xes_frames)} data frames, and {len(end_frames)} end frames." ) @@ -506,3 +535,4 @@ if __name__ == "__main__": # pragma: no cover logger.error(f"Error during TimepixFlyBackend operation: {e}") finally: timepix.on_destroy() + print("TimepixFlyBackend destroyed.") diff --git a/superxas_bec/devices/timepix/utils.py b/superxas_bec/devices/timepix/utils.py index 2df454c..d7c8634 100644 --- a/superxas_bec/devices/timepix/utils.py +++ b/superxas_bec/devices/timepix/utils.py @@ -1,4 +1,240 @@ -# """Utility module for the Timepix detector.""" +"""Utility module for the Timepix detector.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from ophyd import Device, DeviceStatus, StatusBase + + +class AndStatusWithList(DeviceStatus): + """ + Custom implementation of the AndStatus that combines the + option to add multiple statuses as a list, and in addition + allows for adding the Device as an object to access its + methods. + + Args""" + + def __init__( + self, + device: Device, + status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus], + **kwargs, + ): + self.all_statuses = status_list if isinstance(status_list, list) else [status_list] + super().__init__(device=device, **kwargs) + self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses] + + def inner(status): + with self._lock: + if self._externally_initiated_completion: + return + if self.done: # Return if status is already done.. It must be resolved already + return + + for st in self.all_statuses: + with st._lock: + if st.done and not st.success: + self.set_exception(st.exception()) # st._exception + return + + if all(st.done for st in self.all_statuses) and all( + st.success for st in self.all_statuses + ): + self.set_finished() + + for st in self.all_statuses: + with st._lock: + st.add_callback(inner) + + # TODO improve __repr__ and __str__ + def __repr__(self): + return "".format(self=self) + + def __str__(self): + return "".format(self=self) + + def __contains__(self, status: StatusBase | DeviceStatus) -> bool: + for child in self.all_statuses: + if child == status: + return True + if isinstance(child, AndStatusWithList): + if status in child: + return True + + return False + + # TODO Check if this actually works.... + def set_exception(self, exc): + with self._lock: + for st in self.all_statuses: + with st._lock: + if not st.done: + st.set_exception(exc) + + def _run_callbacks(self): + """ + Set the Event and run the callbacks. + """ + if self.timeout is None: + timeout = None + else: + timeout = self.timeout + self.settle_time + if not self._settled_event.wait(timeout): + self.log.warning("%r has timed out", self) + with self._externally_initiated_completion_lock: + if self._exception is None: + exc = TimeoutError( + f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." + ) + self._exception = exc + # Mark this as "settled". + try: + self._settled() + except Exception: + self.log.exception("%r encountered error during _settled()", self) + with self._lock: + self._event.set() + if self._exception is not None: + try: + self._handle_failure() + except Exception: + self.log.exception("%r encountered an error during _handle_failure()", self) + for cb in self._callbacks: + try: + cb(self) + except Exception: + self.log.exception( + "An error was raised on a background thread while " + "running the callback %r(%r).", + cb, + self, + ) + self._callbacks.clear() + + +class AndStatus(StatusBase): + """Custom AndStatus for TimePix detector.""" + + def __init__( + self, + left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None, + name: str | Device | None = None, + right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None, + **kwargs, + ): + self.left = left if isinstance(left, list) else [left] + if right is not None: + self.right = right if isinstance(right, list) else [right] + else: + self.right = [] + self.all_statuses = self.left + self.right + if name is None: + name = "unname_status" + elif isinstance(name, Device): + name = name.name + else: + name = name + self.name = name + super().__init__(**kwargs) + self._trace_attributes["left"] = [st._trace_attributes for st in self.left] + self._trace_attributes["right"] = [st._trace_attributes for st in self.right] + + def inner(status): + with self._lock: + if self._externally_initiated_completion: + return + if self.done: # Return if status is already done.. It must be resolved already + return + + for st in self.all_statuses: + with st._lock: + if st.done and not st.success: + self.set_exception(st.exception()) # st._exception + return + + if all(st.done for st in self.all_statuses) and all( + st.success for st in self.all_statuses + ): + self.set_finished() + + for st in self.all_statuses: + with st._lock: + st.add_callback(inner) + + def __repr__(self): + return "({self.left!r} & {self.right!r})".format(self=self) + + def __str__(self): + return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self) + + def __contains__(self, status: StatusBase) -> bool: + for child in [self.left, self.right]: + if child == status: + return True + if isinstance(child, AndStatus): + if status in child: + return True + + return False + + def _run_callbacks(self): + """ + Set the Event and run the callbacks. + """ + if self.timeout is None: + timeout = None + else: + timeout = self.timeout + self.settle_time + if not self._settled_event.wait(timeout): + # We have timed out. It's possible that set_finished() has already + # been called but we got here before the settle_time timer expired. + # And it's possible that in this space be between the above + # statement timing out grabbing the lock just below, + # set_exception(exc) has been called. Both of these possibilties + # are accounted for. + self.log.warning("%r has timed out", self) + with self._externally_initiated_completion_lock: + # Set the exception and mark the Status as done, unless + # set_exception(exc) was called externally before we grabbed + # the lock. + if self._exception is None: + exc = TimeoutError( + f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." + ) + self._exception = exc + # Mark this as "settled". + try: + self._settled() + except Exception: + # No alternative but to log this. We can't supersede set_exception, + # and we have to continue and run the callbacks. + self.log.exception("%r encountered error during _settled()", self) + # Now we know whether or not we have succeed or failed, either by + # timeout above or by set_exception(exc), so we can set the Event that + # will mark this Status as done. + with self._lock: + self._event.set() + if self._exception is not None: + try: + self._handle_failure() + except Exception: + self.log.exception("%r encountered an error during _handle_failure()", self) + # The callbacks have access to self, from which they can distinguish + # success or failure. + for cb in self._callbacks: + try: + cb(self) + except Exception: + self.log.exception( + "An error was raised on a background thread while " + "running the callback %r(%r).", + cb, + self, + ) + self._callbacks.clear() + # from __future__ import annotations