From 99f6192f37063af62e922efcf62dbda01c14240e Mon Sep 17 00:00:00 2001 From: appel_c Date: Sun, 30 Nov 2025 22:29:23 +0100 Subject: [PATCH] refactor: deprecate duplicate Status implementation. --- debye_bec/devices/pilatus/utils.py | 243 ----------------------------- 1 file changed, 243 deletions(-) delete mode 100644 debye_bec/devices/pilatus/utils.py diff --git a/debye_bec/devices/pilatus/utils.py b/debye_bec/devices/pilatus/utils.py deleted file mode 100644 index 23272d6..0000000 --- a/debye_bec/devices/pilatus/utils.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Temporary utility module for Status Object implementations.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -from ophyd import Device, DeviceStatus, StatusBase - - -class AndStatusWithList(DeviceStatus): - """ - Custom implementation of the AndStatus that combines the - option to add multiple statuses as a list, and in addition - allows for adding the Device as an object to access its - methods. - - Args""" - - def __init__( - self, - device: Device, - status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus], - **kwargs, - ): - self.all_statuses = status_list if isinstance(status_list, list) else [status_list] - super().__init__(device=device, **kwargs) - self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses] - - def inner(status): - with self._lock: - if self._externally_initiated_completion: - return - if self.done: # Return if status is already done.. It must be resolved already - return - - for st in self.all_statuses: - with st._lock: - if st.done and not st.success: - self.set_exception(st.exception()) # st._exception - return - - if all(st.done for st in self.all_statuses) and all( - st.success for st in self.all_statuses - ): - self.set_finished() - - for st in self.all_statuses: - with st._lock: - st.add_callback(inner) - - # TODO improve __repr__ and __str__ - def __repr__(self): - return "".format(self=self) - - def __str__(self): - return "".format(self=self) - - def __contains__(self, status: StatusBase | DeviceStatus) -> bool: - for child in self.all_statuses: - if child == status: - return True - if isinstance(child, AndStatusWithList): - if status in child: - return True - - return False - - # # TODO Check if this actually works.... - # def set_exception(self, exc): - # # Propagate the exception to all sub-statuses that are not done yet. - # - # with self._lock: - # if self._externally_initiated_completion: - # return - # if self.done: # Return if status is already done.. It must be resolved already - # return - # super().set_exception(exc) - # for st in self.all_statuses: - # with st._lock: - # if not st.done: - # st.set_exception(exc) - - def _run_callbacks(self): - """ - Set the Event and run the callbacks. - """ - if self.timeout is None: - timeout = None - else: - timeout = self.timeout + self.settle_time - if not self._settled_event.wait(timeout): - self.log.warning("%r has timed out", self) - with self._externally_initiated_completion_lock: - if self._exception is None: - exc = TimeoutError( - f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." - ) - self._exception = exc - # Mark this as "settled". - try: - self._settled() - except Exception: - self.log.exception("%r encountered error during _settled()", self) - with self._lock: - self._event.set() - if self._exception is not None: - try: - self._handle_failure() - except Exception: - self.log.exception("%r encountered an error during _handle_failure()", self) - for cb in self._callbacks: - try: - cb(self) - except Exception: - self.log.exception( - "An error was raised on a background thread while " - "running the callback %r(%r).", - cb, - self, - ) - self._callbacks.clear() - - -class AndStatus(StatusBase): - """Custom AndStatus for TimePix detector.""" - - def __init__( - self, - left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None, - name: str | Device | None = None, - right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None, - **kwargs, - ): - self.left = left if isinstance(left, list) else [left] - if right is not None: - self.right = right if isinstance(right, list) else [right] - else: - self.right = [] - self.all_statuses = self.left + self.right - if name is None: - name = "unname_status" - elif isinstance(name, Device): - name = name.name - else: - name = name - self.name = name - super().__init__(**kwargs) - self._trace_attributes["left"] = [st._trace_attributes for st in self.left] - self._trace_attributes["right"] = [st._trace_attributes for st in self.right] - - def inner(status): - with self._lock: - if self._externally_initiated_completion: - return - if self.done: # Return if status is already done.. It must be resolved already - return - - for st in self.all_statuses: - with st._lock: - if st.done and not st.success: - self.set_exception(st.exception()) # st._exception - return - - if all(st.done for st in self.all_statuses) and all( - st.success for st in self.all_statuses - ): - self.set_finished() - - for st in self.all_statuses: - with st._lock: - st.add_callback(inner) - - def __repr__(self): - return "({self.left!r} & {self.right!r})".format(self=self) - - def __str__(self): - return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self) - - def __contains__(self, status: StatusBase) -> bool: - for child in [self.left, self.right]: - if child == status: - return True - if isinstance(child, AndStatus): - if status in child: - return True - - return False - - def _run_callbacks(self): - """ - Set the Event and run the callbacks. - """ - if self.timeout is None: - timeout = None - else: - timeout = self.timeout + self.settle_time - if not self._settled_event.wait(timeout): - # We have timed out. It's possible that set_finished() has already - # been called but we got here before the settle_time timer expired. - # And it's possible that in this space be between the above - # statement timing out grabbing the lock just below, - # set_exception(exc) has been called. Both of these possibilties - # are accounted for. - self.log.warning("%r has timed out", self) - with self._externally_initiated_completion_lock: - # Set the exception and mark the Status as done, unless - # set_exception(exc) was called externally before we grabbed - # the lock. - if self._exception is None: - exc = TimeoutError( - f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." - ) - self._exception = exc - # Mark this as "settled". - try: - self._settled() - except Exception: - # No alternative but to log this. We can't supersede set_exception, - # and we have to continue and run the callbacks. - self.log.exception("%r encountered error during _settled()", self) - # Now we know whether or not we have succeed or failed, either by - # timeout above or by set_exception(exc), so we can set the Event that - # will mark this Status as done. - with self._lock: - self._event.set() - if self._exception is not None: - try: - self._handle_failure() - except Exception: - self.log.exception("%r encountered an error during _handle_failure()", self) - # The callbacks have access to self, from which they can distinguish - # success or failure. - for cb in self._callbacks: - try: - cb(self) - except Exception: - self.log.exception( - "An error was raised on a background thread while " - "running the callback %r(%r).", - cb, - self, - ) - self._callbacks.clear()