Fix compare and transition status #67

Merged
appel_c merged 4 commits from fix/compare_transition_status_refactoring into main 2025-12-05 16:39:13 +01:00
7 changed files with 22 additions and 281 deletions

View File

@@ -9,8 +9,7 @@ from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices import CompareStatus, DeviceStatus, SubscriptionStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked

View File

@@ -9,16 +9,15 @@ used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import time
from typing import Any, Literal
from typing import Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd import DeviceStatus, StatusBase
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
from typeguard import typechecked
@@ -281,7 +280,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
self.scan_control.scan_status,
transitions=[ScanControlScanStatus.READY, ScanControlScanStatus.RUNNING],
strict=True,
raise_states=[ScanControlScanStatus.PARAMETER_WRONG],
failure_states=[ScanControlScanStatus.PARAMETER_WRONG],
)
self.cancel_on_stop(status)
start_func(1)

View File

@@ -1,12 +1,11 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal

View File

@@ -17,12 +17,10 @@ from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from pydantic import BaseModel, Field
from debye_bec.devices.pilatus.utils import AndStatusWithList
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
@@ -360,14 +358,12 @@ class Pilatus(PSIDeviceBase, ADBase):
# f"Live Mode on detector {self.name} did not stop: {content} after 10s."
# )
def check_detector_stop_running_acquisition(self) -> AndStatusWithList:
def check_detector_stop_running_acquisition(self) -> AndStatus:
"""Check if the detector is still running an acquisition."""
status_acquire = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
status_writing = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value)
status = AndStatusWithList(
device=self, status_list=[status_acquire, status_writing, status_cam_server]
)
status = status_acquire & status_writing & status_cam_server
return status
def _calculate_trigger(self, scan_msg: ScanStatusMessage):
@@ -562,9 +558,7 @@ class Pilatus(PSIDeviceBase, ADBase):
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value)
status = AndStatusWithList(
device=self, status_list=[status_hdf, status_cam, status_cam_server]
)
status = status_hdf & status_cam & status_cam_server
self.cam.acquire.put(1)
self.hdf.capture.put(1)
self.cancel_on_stop(status)
@@ -593,15 +587,15 @@ class Pilatus(PSIDeviceBase, ADBase):
scan_msg.scan_name in self.xas_xrd_scan_names or scan_msg.scan_type == "step"
): # TODO how to deal with fly scans?
if status.success:
status.device.file_event.put(
file_path=status.device._full_path, # pylint: disable:protected-access
self.file_event.put(
file_path=self._full_path,
done=True,
successful=True,
hinted_h5_entries={"data": "/entry/data/data"},
)
else:
status.device.file_event.put(
file_path=status.device._full_path, # pylint: disable:protected-access
self.file_event.put(
file_path=self._full_path,
done=True,
successful=False,
hinted_h5_entries={"data": "/entry/data/data"},
@@ -622,15 +616,12 @@ class Pilatus(PSIDeviceBase, ADBase):
# For long scans, it can be that the mono will execute one cycle more,
# meaning a few more XRD triggers will be sent
status_img_written = CompareStatus(
self.hdf.num_captured, self.n_images, operation=">="
self.hdf.num_captured, self.n_images, operation_success=">="
)
else:
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status = AndStatusWithList(
device=self,
status_list=[status_hdf, status_cam, status_img_written, status_cam_server],
)
status = status_hdf & status_cam & status_img_written & status_cam_server
status.add_callback(self._complete_callback) # Callback that writing was successful
self.cancel_on_stop(status)
return status

View File

@@ -1,243 +0,0 @@
"""Temporary utility module for Status Object implementations."""
from __future__ import annotations
from typing import TYPE_CHECKING
from ophyd import Device, DeviceStatus, StatusBase
class AndStatusWithList(DeviceStatus):
"""
Custom implementation of the AndStatus that combines the
option to add multiple statuses as a list, and in addition
allows for adding the Device as an object to access its
methods.
Args"""
def __init__(
self,
device: Device,
status_list: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus],
**kwargs,
):
self.all_statuses = status_list if isinstance(status_list, list) else [status_list]
super().__init__(device=device, **kwargs)
self._trace_attributes["all"] = [st._trace_attributes for st in self.all_statuses]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
# TODO improve __repr__ and __str__
def __repr__(self):
return "<AndStatusWithList({self.all_statuses!r})>".format(self=self)
def __str__(self):
return "<AndStatusWithList(done={self.done}, success={self.success})>".format(self=self)
def __contains__(self, status: StatusBase | DeviceStatus) -> bool:
for child in self.all_statuses:
if child == status:
return True
if isinstance(child, AndStatusWithList):
if status in child:
return True
return False
# # TODO Check if this actually works....
# def set_exception(self, exc):
# # Propagate the exception to all sub-statuses that are not done yet.
#
# with self._lock:
# if self._externally_initiated_completion:
# return
# if self.done: # Return if status is already done.. It must be resolved already
# return
# super().set_exception(exc)
# for st in self.all_statuses:
# with st._lock:
# if not st.done:
# st.set_exception(exc)
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
if self._exception is None:
exc = TimeoutError(
f"AndStatus from device {self.device.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
self.log.exception("%r encountered error during _settled()", self)
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()
class AndStatus(StatusBase):
"""Custom AndStatus for TimePix detector."""
def __init__(
self,
left: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None,
name: str | Device | None = None,
right: StatusBase | DeviceStatus | list[StatusBase | DeviceStatus] | None = None,
**kwargs,
):
self.left = left if isinstance(left, list) else [left]
if right is not None:
self.right = right if isinstance(right, list) else [right]
else:
self.right = []
self.all_statuses = self.left + self.right
if name is None:
name = "unname_status"
elif isinstance(name, Device):
name = name.name
else:
name = name
self.name = name
super().__init__(**kwargs)
self._trace_attributes["left"] = [st._trace_attributes for st in self.left]
self._trace_attributes["right"] = [st._trace_attributes for st in self.right]
def inner(status):
with self._lock:
if self._externally_initiated_completion:
return
if self.done: # Return if status is already done.. It must be resolved already
return
for st in self.all_statuses:
with st._lock:
if st.done and not st.success:
self.set_exception(st.exception()) # st._exception
return
if all(st.done for st in self.all_statuses) and all(
st.success for st in self.all_statuses
):
self.set_finished()
for st in self.all_statuses:
with st._lock:
st.add_callback(inner)
def __repr__(self):
return "({self.left!r} & {self.right!r})".format(self=self)
def __str__(self):
return "{0}(done={1.done}, " "success={1.success})" "".format(self.__class__.__name__, self)
def __contains__(self, status: StatusBase) -> bool:
for child in [self.left, self.right]:
if child == status:
return True
if isinstance(child, AndStatus):
if status in child:
return True
return False
def _run_callbacks(self):
"""
Set the Event and run the callbacks.
"""
if self.timeout is None:
timeout = None
else:
timeout = self.timeout + self.settle_time
if not self._settled_event.wait(timeout):
# We have timed out. It's possible that set_finished() has already
# been called but we got here before the settle_time timer expired.
# And it's possible that in this space be between the above
# statement timing out grabbing the lock just below,
# set_exception(exc) has been called. Both of these possibilties
# are accounted for.
self.log.warning("%r has timed out", self)
with self._externally_initiated_completion_lock:
# Set the exception and mark the Status as done, unless
# set_exception(exc) was called externally before we grabbed
# the lock.
if self._exception is None:
exc = TimeoutError(
f"Status with name {self.name} failed to complete in specified timeout of {self.timeout + self.settle_time}."
)
self._exception = exc
# Mark this as "settled".
try:
self._settled()
except Exception:
# No alternative but to log this. We can't supersede set_exception,
# and we have to continue and run the callbacks.
self.log.exception("%r encountered error during _settled()", self)
# Now we know whether or not we have succeed or failed, either by
# timeout above or by set_exception(exc), so we can set the Event that
# will mark this Status as done.
with self._lock:
self._event.set()
if self._exception is not None:
try:
self._handle_failure()
except Exception:
self.log.exception("%r encountered an error during _handle_failure()", self)
# The callbacks have access to self, from which they can distinguish
# success or failure.
for cb in self._callbacks:
try:
cb(self)
except Exception:
self.log.exception(
"An error was raised on a background thread while "
"running the callback %r(%r).",
cb,
self,
)
self._callbacks.clear()

View File

@@ -10,8 +10,6 @@ from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from debye_bec.devices.pilatus.utils import AndStatusWithList
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
@@ -83,8 +81,8 @@ class PilatusCurtain(PSIDeviceBase):
self.open_cover.put(1)
# TODO timeout ok?
status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=")
status = AndStatusWithList(device=self, status_list=[status_open, status_error])
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=")
status = status_open & status_error
return status
else:
return None
@@ -95,8 +93,8 @@ class PilatusCurtain(PSIDeviceBase):
self.close_cover.put(1)
# TODO timeout ok?
status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation="!=")
status = AndStatusWithList(device=self, status_list=[status_close, status_error])
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=")
status = status_close & status_error
return status
else:
return None

View File

@@ -1,10 +1,8 @@
# pylint: skip-file
import os
import threading
from typing import TYPE_CHECKING, Generator
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_lib.messages import ScanStatusMessage
@@ -177,7 +175,7 @@ def test_pilatus_on_trigger_cancel_on_stop(pilatus):
status.wait(timeout=5)
def test_pilatus_on_complete(pilatus):
def test_pilatus_on_complete(pilatus: Pilatus):
"""Test the on_complete logic of the Pilatus detector."""
if pilatus.scan_info.msg.scan_name.startswith("xas"):