diff --git a/debye_bec/devices/pilatus/utils.py b/debye_bec/devices/pilatus/utils.py new file mode 100644 index 0000000..ae025b3 --- /dev/null +++ b/debye_bec/devices/pilatus/utils.py @@ -0,0 +1,236 @@ +"""Temporary utility module for Status Object implementations.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from ophyd import Device, DeviceStatus, StatusBase + + +class AndStatusWithList(DeviceStatus): + """ + Custom implementation of the AndStatus that combines the + option to add multiple statuses as a list, and in addition + allows for adding the Device as an object to access its + methods. + + Args""" + + def __init__( + self, + device: Device, + status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus], + **kwargs, + ): + self.all_statuses = status_list if isinstance(status_list, list) else [status_list] + super().__init__(device=device, **kwargs) + self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses] + + def inner(status): + with self._lock: + if self._externally_initiated_completion: + return + if self.done: # Return if status is already done.. It must be resolved already + return + + for st in self.all_statuses: + with st._lock: + if st.done and not st.success: + self.set_exception(st.exception()) # st._exception + return + + if all(st.done for st in self.all_statuses) and all( + st.success for st in self.all_statuses + ): + self.set_finished() + + for st in self.all_statuses: + with st._lock: + st.add_callback(inner) + + # TODO improve __repr__ and __str__ + def __repr__(self): + return "".format(self=self) + + def __str__(self): + return "".format(self=self) + + def __contains__(self, status: StatusBase | DeviceStatus) -> bool: + for child in self.all_statuses: + if child == status: + return True + if isinstance(child, AndStatusWithList): + if status in child: + return True + + return False + + # TODO Check if this actually works.... + def set_exception(self, exc): + with self._lock: + for st in self.all_statuses: + with st._lock: + if not st.done: + st.set_exception(exc) + + def _run_callbacks(self): + """ + Set the Event and run the callbacks. + """ + if self.timeout is None: + timeout = None + else: + timeout = self.timeout + self.settle_time + if not self._settled_event.wait(timeout): + self.log.warning("%r has timed out", self) + with self._externally_initiated_completion_lock: + if self._exception is None: + exc = TimeoutError( + f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." + ) + self._exception = exc + # Mark this as "settled". + try: + self._settled() + except Exception: + self.log.exception("%r encountered error during _settled()", self) + with self._lock: + self._event.set() + if self._exception is not None: + try: + self._handle_failure() + except Exception: + self.log.exception("%r encountered an error during _handle_failure()", self) + for cb in self._callbacks: + try: + cb(self) + except Exception: + self.log.exception( + "An error was raised on a background thread while " + "running the callback %r(%r).", + cb, + self, + ) + self._callbacks.clear() + + +class AndStatus(StatusBase): + """Custom AndStatus for TimePix detector.""" + + def __init__( + self, + left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None, + name: str | Device | None = None, + right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None, + **kwargs, + ): + self.left = left if isinstance(left, list) else [left] + if right is not None: + self.right = right if isinstance(right, list) else [right] + else: + self.right = [] + self.all_statuses = self.left + self.right + if name is None: + name = "unname_status" + elif isinstance(name, Device): + name = name.name + else: + name = name + self.name = name + super().__init__(**kwargs) + self._trace_attributes["left"] = [st._trace_attributes for st in self.left] + self._trace_attributes["right"] = [st._trace_attributes for st in self.right] + + def inner(status): + with self._lock: + if self._externally_initiated_completion: + return + if self.done: # Return if status is already done.. It must be resolved already + return + + for st in self.all_statuses: + with st._lock: + if st.done and not st.success: + self.set_exception(st.exception()) # st._exception + return + + if all(st.done for st in self.all_statuses) and all( + st.success for st in self.all_statuses + ): + self.set_finished() + + for st in self.all_statuses: + with st._lock: + st.add_callback(inner) + + def __repr__(self): + return "({self.left!r} & {self.right!r})".format(self=self) + + def __str__(self): + return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self) + + def __contains__(self, status: StatusBase) -> bool: + for child in [self.left, self.right]: + if child == status: + return True + if isinstance(child, AndStatus): + if status in child: + return True + + return False + + def _run_callbacks(self): + """ + Set the Event and run the callbacks. + """ + if self.timeout is None: + timeout = None + else: + timeout = self.timeout + self.settle_time + if not self._settled_event.wait(timeout): + # We have timed out. It's possible that set_finished() has already + # been called but we got here before the settle_time timer expired. + # And it's possible that in this space be between the above + # statement timing out grabbing the lock just below, + # set_exception(exc) has been called. Both of these possibilties + # are accounted for. + self.log.warning("%r has timed out", self) + with self._externally_initiated_completion_lock: + # Set the exception and mark the Status as done, unless + # set_exception(exc) was called externally before we grabbed + # the lock. + if self._exception is None: + exc = TimeoutError( + f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." + ) + self._exception = exc + # Mark this as "settled". + try: + self._settled() + except Exception: + # No alternative but to log this. We can't supersede set_exception, + # and we have to continue and run the callbacks. + self.log.exception("%r encountered error during _settled()", self) + # Now we know whether or not we have succeed or failed, either by + # timeout above or by set_exception(exc), so we can set the Event that + # will mark this Status as done. + with self._lock: + self._event.set() + if self._exception is not None: + try: + self._handle_failure() + except Exception: + self.log.exception("%r encountered an error during _handle_failure()", self) + # The callbacks have access to self, from which they can distinguish + # success or failure. + for cb in self._callbacks: + try: + cb(self) + except Exception: + self.log.exception( + "An error was raised on a background thread while " + "running the callback %r(%r).", + cb, + self, + ) + self._callbacks.clear()