From c6ed27966c9ee3ad05e3551b31514c3006328ee0 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 26 Nov 2025 13:46:53 +0100 Subject: [PATCH 1/4] fix(status): fix compare and transition status occurences --- debye_bec/devices/mo1_bragg/mo1_bragg.py | 2 +- debye_bec/devices/pilatus/pilatus.py | 2 +- debye_bec/devices/pilatus_curtain.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg.py b/debye_bec/devices/mo1_bragg/mo1_bragg.py index 7d37e9e..f1b22d5 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg.py @@ -281,7 +281,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner): self.scan_control.scan_status, transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING], strict=True, - raise_states=[ScanControlScanStatus.PARAMETER_WRONG], + failure_states=[ScanControlScanStatus.PARAMETER_WRONG], ) self.cancel_on_stop(status) start_func(1) diff --git a/debye_bec/devices/pilatus/pilatus.py b/debye_bec/devices/pilatus/pilatus.py index e12aac1..d49a692 100644 --- a/debye_bec/devices/pilatus/pilatus.py +++ b/debye_bec/devices/pilatus/pilatus.py @@ -622,7 +622,7 @@ class Pilatus(PSIDeviceBase, ADBase): # For long scans, it can be that the mono will execute one cycle more, # meaning a few more XRD triggers will be sent status_img_written = CompareStatus( - self.hdf.num_captured, self.n_images, operation=">=" + self.hdf.num_captured, self.n_images, operation_success=">=" ) else: status_img_written = CompareStatus(self.hdf.num_captured, self.n_images) diff --git a/debye_bec/devices/pilatus_curtain.py b/debye_bec/devices/pilatus_curtain.py index 4f8ab4e..c7f3f05 100644 --- a/debye_bec/devices/pilatus_curtain.py +++ b/debye_bec/devices/pilatus_curtain.py @@ -83,7 +83,7 @@ class PilatusCurtain(PSIDeviceBase): self.open_cover.put(1) # TODO timeout ok? status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5) - status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=") + status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=") status = AndStatusWithList(device=self, status_list=[status_open, status_error]) return status else: @@ -95,7 +95,7 @@ class PilatusCurtain(PSIDeviceBase): self.close_cover.put(1) # TODO timeout ok? status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5) - status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=") + status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=") status = AndStatusWithList(device=self, status_list=[status_close, status_error]) return status else: -- 2.49.1 From 0a8272685dcffa5d875b6deca2af990bf39fccd3 Mon Sep 17 00:00:00 2001 From: appel_c Date: Sun, 30 Nov 2025 22:28:34 +0100 Subject: [PATCH 2/4] fix(status): cleanup and remove of old status usage --- .../ionization_chambers/ionization_chamber.py | 3 +-- debye_bec/devices/mo1_bragg/mo1_bragg.py | 7 +++---- debye_bec/devices/nidaq/nidaq.py | 5 ++--- debye_bec/devices/pilatus/pilatus.py | 19 +++++-------------- debye_bec/devices/pilatus_curtain.py | 6 ++---- tests/tests_devices/test_pilatus.py | 2 -- 6 files changed, 13 insertions(+), 29 deletions(-) diff --git a/debye_bec/devices/ionization_chambers/ionization_chamber.py b/debye_bec/devices/ionization_chambers/ionization_chamber.py index a829560..52a6a78 100644 --- a/debye_bec/devices/ionization_chambers/ionization_chamber.py +++ b/debye_bec/devices/ionization_chambers/ionization_chamber.py @@ -9,8 +9,7 @@ from ophyd import Component as Cpt from ophyd import Device from ophyd import DynamicDeviceComponent as Dcpt from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV -from ophyd.status import DeviceStatus, SubscriptionStatus -from ophyd_devices import CompareStatus, TransitionStatus +from ophyd_devices import CompareStatus, DeviceStatus, SubscriptionStatus, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from typeguard import typechecked diff --git a/debye_bec/devices/mo1_bragg/mo1_bragg.py b/debye_bec/devices/mo1_bragg/mo1_bragg.py index f1b22d5..5189816 100644 --- a/debye_bec/devices/mo1_bragg/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg/mo1_bragg.py @@ -9,16 +9,15 @@ used to ensure that the action is executed completely. This is believed to allow for a more stable execution of the action.""" import time -from typing import Any, Literal +from typing import Literal from bec_lib.devicemanager import ScanInfo from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import DeviceStatus, Signal, StatusBase -from ophyd.status import SubscriptionStatus, WaitTimeoutError +from ophyd import DeviceStatus, StatusBase +from ophyd.status import WaitTimeoutError from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd_devices.utils.errors import DeviceStopError from pydantic import BaseModel, Field from typeguard import typechecked diff --git a/debye_bec/devices/nidaq/nidaq.py b/debye_bec/devices/nidaq/nidaq.py index 3b7bb93..8d6dcd9 100644 --- a/debye_bec/devices/nidaq/nidaq.py +++ b/debye_bec/devices/nidaq/nidaq.py @@ -1,12 +1,11 @@ from __future__ import annotations -import time -from typing import TYPE_CHECKING, Literal, cast +from typing import TYPE_CHECKING, Literal from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase -from ophyd.status import SubscriptionStatus, WaitTimeoutError +from ophyd.status import WaitTimeoutError from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.sim.sim_signals import SetableSignal diff --git a/debye_bec/devices/pilatus/pilatus.py b/debye_bec/devices/pilatus/pilatus.py index d49a692..7dabb0d 100644 --- a/debye_bec/devices/pilatus/pilatus.py +++ b/debye_bec/devices/pilatus/pilatus.py @@ -17,12 +17,10 @@ from ophyd.areadetector.cam import ADBase, PilatusDetectorCam from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin from ophyd.status import WaitTimeoutError -from ophyd_devices import CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal +from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from pydantic import BaseModel, Field -from debye_bec.devices.pilatus.utils import AndStatusWithList - if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage @@ -360,14 +358,12 @@ class Pilatus(PSIDeviceBase, ADBase): # f"Live Mode on detector {self.name} did not stop: {content} after 10s." # ) - def check_detector_stop_running_acquisition(self) -> AndStatusWithList: + def check_detector_stop_running_acquisition(self) -> AndStatus: """Check if the detector is still running an acquisition.""" status_acquire = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value) status_writing = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value) status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value) - status = AndStatusWithList( - device=self, status_list=[status_acquire, status_writing, status_cam_server] - ) + status = status_acquire & status_writing & status_cam_server return status def _calculate_trigger(self, scan_msg: ScanStatusMessage): @@ -562,9 +558,7 @@ class Pilatus(PSIDeviceBase, ADBase): status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value) status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value) status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value) - status = AndStatusWithList( - device=self, status_list=[status_hdf, status_cam, status_cam_server] - ) + status = status_hdf & status_cam & status_cam_server self.cam.acquire.put(1) self.hdf.capture.put(1) self.cancel_on_stop(status) @@ -627,10 +621,7 @@ class Pilatus(PSIDeviceBase, ADBase): else: status_img_written = CompareStatus(self.hdf.num_captured, self.n_images) status_img_written = CompareStatus(self.hdf.num_captured, self.n_images) - status = AndStatusWithList( - device=self, - status_list=[status_hdf, status_cam, status_img_written, status_cam_server], - ) + status = status_hdf & status_cam & status_img_written & status_cam_server status.add_callback(self._complete_callback) # Callback that writing was successful self.cancel_on_stop(status) return status diff --git a/debye_bec/devices/pilatus_curtain.py b/debye_bec/devices/pilatus_curtain.py index c7f3f05..d129673 100644 --- a/debye_bec/devices/pilatus_curtain.py +++ b/debye_bec/devices/pilatus_curtain.py @@ -10,8 +10,6 @@ from ophyd import EpicsSignal, EpicsSignalRO from ophyd_devices import CompareStatus, DeviceStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from debye_bec.devices.pilatus.utils import AndStatusWithList - if TYPE_CHECKING: from bec_lib.devicemanager import ScanInfo @@ -84,7 +82,7 @@ class PilatusCurtain(PSIDeviceBase): # TODO timeout ok? status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5) status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=") - status = AndStatusWithList(device=self, status_list=[status_open, status_error]) + status = status_open & status_error return status else: return None @@ -96,7 +94,7 @@ class PilatusCurtain(PSIDeviceBase): # TODO timeout ok? status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5) status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=") - status = AndStatusWithList(device=self, status_list=[status_close, status_error]) + status = status_close & status_error return status else: return None diff --git a/tests/tests_devices/test_pilatus.py b/tests/tests_devices/test_pilatus.py index 1ad3347..46228e3 100644 --- a/tests/tests_devices/test_pilatus.py +++ b/tests/tests_devices/test_pilatus.py @@ -1,10 +1,8 @@ # pylint: skip-file -import os import threading from typing import TYPE_CHECKING, Generator from unittest import mock -import numpy as np import ophyd import pytest from bec_lib.messages import ScanStatusMessage -- 2.49.1 From 99f6192f37063af62e922efcf62dbda01c14240e Mon Sep 17 00:00:00 2001 From: appel_c Date: Sun, 30 Nov 2025 22:29:23 +0100 Subject: [PATCH 3/4] refactor: deprecate duplicate Status implementation. --- debye_bec/devices/pilatus/utils.py | 243 ----------------------------- 1 file changed, 243 deletions(-) delete mode 100644 debye_bec/devices/pilatus/utils.py diff --git a/debye_bec/devices/pilatus/utils.py b/debye_bec/devices/pilatus/utils.py deleted file mode 100644 index 23272d6..0000000 --- a/debye_bec/devices/pilatus/utils.py +++ /dev/null @@ -1,243 +0,0 @@ -"""Temporary utility module for Status Object implementations.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -from ophyd import Device, DeviceStatus, StatusBase - - -class AndStatusWithList(DeviceStatus): - """ - Custom implementation of the AndStatus that combines the - option to add multiple statuses as a list, and in addition - allows for adding the Device as an object to access its - methods. - - Args""" - - def __init__( - self, - device: Device, - status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus], - **kwargs, - ): - self.all_statuses = status_list if isinstance(status_list, list) else [status_list] - super().__init__(device=device, **kwargs) - self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses] - - def inner(status): - with self._lock: - if self._externally_initiated_completion: - return - if self.done: # Return if status is already done.. It must be resolved already - return - - for st in self.all_statuses: - with st._lock: - if st.done and not st.success: - self.set_exception(st.exception()) # st._exception - return - - if all(st.done for st in self.all_statuses) and all( - st.success for st in self.all_statuses - ): - self.set_finished() - - for st in self.all_statuses: - with st._lock: - st.add_callback(inner) - - # TODO improve __repr__ and __str__ - def __repr__(self): - return "".format(self=self) - - def __str__(self): - return "".format(self=self) - - def __contains__(self, status: StatusBase | DeviceStatus) -> bool: - for child in self.all_statuses: - if child == status: - return True - if isinstance(child, AndStatusWithList): - if status in child: - return True - - return False - - # # TODO Check if this actually works.... - # def set_exception(self, exc): - # # Propagate the exception to all sub-statuses that are not done yet. - # - # with self._lock: - # if self._externally_initiated_completion: - # return - # if self.done: # Return if status is already done.. It must be resolved already - # return - # super().set_exception(exc) - # for st in self.all_statuses: - # with st._lock: - # if not st.done: - # st.set_exception(exc) - - def _run_callbacks(self): - """ - Set the Event and run the callbacks. - """ - if self.timeout is None: - timeout = None - else: - timeout = self.timeout + self.settle_time - if not self._settled_event.wait(timeout): - self.log.warning("%r has timed out", self) - with self._externally_initiated_completion_lock: - if self._exception is None: - exc = TimeoutError( - f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." - ) - self._exception = exc - # Mark this as "settled". - try: - self._settled() - except Exception: - self.log.exception("%r encountered error during _settled()", self) - with self._lock: - self._event.set() - if self._exception is not None: - try: - self._handle_failure() - except Exception: - self.log.exception("%r encountered an error during _handle_failure()", self) - for cb in self._callbacks: - try: - cb(self) - except Exception: - self.log.exception( - "An error was raised on a background thread while " - "running the callback %r(%r).", - cb, - self, - ) - self._callbacks.clear() - - -class AndStatus(StatusBase): - """Custom AndStatus for TimePix detector.""" - - def __init__( - self, - left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None, - name: str | Device | None = None, - right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None, - **kwargs, - ): - self.left = left if isinstance(left, list) else [left] - if right is not None: - self.right = right if isinstance(right, list) else [right] - else: - self.right = [] - self.all_statuses = self.left + self.right - if name is None: - name = "unname_status" - elif isinstance(name, Device): - name = name.name - else: - name = name - self.name = name - super().__init__(**kwargs) - self._trace_attributes["left"] = [st._trace_attributes for st in self.left] - self._trace_attributes["right"] = [st._trace_attributes for st in self.right] - - def inner(status): - with self._lock: - if self._externally_initiated_completion: - return - if self.done: # Return if status is already done.. It must be resolved already - return - - for st in self.all_statuses: - with st._lock: - if st.done and not st.success: - self.set_exception(st.exception()) # st._exception - return - - if all(st.done for st in self.all_statuses) and all( - st.success for st in self.all_statuses - ): - self.set_finished() - - for st in self.all_statuses: - with st._lock: - st.add_callback(inner) - - def __repr__(self): - return "({self.left!r} & {self.right!r})".format(self=self) - - def __str__(self): - return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self) - - def __contains__(self, status: StatusBase) -> bool: - for child in [self.left, self.right]: - if child == status: - return True - if isinstance(child, AndStatus): - if status in child: - return True - - return False - - def _run_callbacks(self): - """ - Set the Event and run the callbacks. - """ - if self.timeout is None: - timeout = None - else: - timeout = self.timeout + self.settle_time - if not self._settled_event.wait(timeout): - # We have timed out. It's possible that set_finished() has already - # been called but we got here before the settle_time timer expired. - # And it's possible that in this space be between the above - # statement timing out grabbing the lock just below, - # set_exception(exc) has been called. Both of these possibilties - # are accounted for. - self.log.warning("%r has timed out", self) - with self._externally_initiated_completion_lock: - # Set the exception and mark the Status as done, unless - # set_exception(exc) was called externally before we grabbed - # the lock. - if self._exception is None: - exc = TimeoutError( - f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}." - ) - self._exception = exc - # Mark this as "settled". - try: - self._settled() - except Exception: - # No alternative but to log this. We can't supersede set_exception, - # and we have to continue and run the callbacks. - self.log.exception("%r encountered error during _settled()", self) - # Now we know whether or not we have succeed or failed, either by - # timeout above or by set_exception(exc), so we can set the Event that - # will mark this Status as done. - with self._lock: - self._event.set() - if self._exception is not None: - try: - self._handle_failure() - except Exception: - self.log.exception("%r encountered an error during _handle_failure()", self) - # The callbacks have access to self, from which they can distinguish - # success or failure. - for cb in self._callbacks: - try: - cb(self) - except Exception: - self.log.exception( - "An error was raised on a background thread while " - "running the callback %r(%r).", - cb, - self, - ) - self._callbacks.clear() -- 2.49.1 From 804a731181d452b600b9bb3833354cb48ebdd001 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 5 Dec 2025 14:17:02 +0100 Subject: [PATCH 4/4] test(pilatus): Fix on_complete callback for pilatus --- debye_bec/devices/pilatus/pilatus.py | 8 ++++---- tests/tests_devices/test_pilatus.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/debye_bec/devices/pilatus/pilatus.py b/debye_bec/devices/pilatus/pilatus.py index 7dabb0d..3db1c4c 100644 --- a/debye_bec/devices/pilatus/pilatus.py +++ b/debye_bec/devices/pilatus/pilatus.py @@ -587,15 +587,15 @@ class Pilatus(PSIDeviceBase, ADBase): scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step" ): # TODO how to deal with fly scans? if status.success: - status.device.file_event.put( - file_path=status.device._full_path, # pylint: disable:protected-access + self.file_event.put( + file_path=self._full_path, done=True, successful=True, hinted_h5_entries={"data": "/entry/data/data"}, ) else: - status.device.file_event.put( - file_path=status.device._full_path, # pylint: disable:protected-access + self.file_event.put( + file_path=self._full_path, done=True, successful=False, hinted_h5_entries={"data": "/entry/data/data"}, diff --git a/tests/tests_devices/test_pilatus.py b/tests/tests_devices/test_pilatus.py index 46228e3..6403e94 100644 --- a/tests/tests_devices/test_pilatus.py +++ b/tests/tests_devices/test_pilatus.py @@ -175,7 +175,7 @@ def test_pilatus_on_trigger_cancel_on_stop(pilatus): status.wait(timeout=5) -def test_pilatus_on_complete(pilatus): +def test_pilatus_on_complete(pilatus: Pilatus): """Test the on_complete logic of the Pilatus detector.""" if pilatus.scan_info.msg.scan_name.startswith("xas"): -- 2.49.1