fix(status): cleanup and remove of old status usage

This commit is contained in:
2025-11-30 22:28:34 +01:00
parent c6ed27966c
commit 0a8272685d
6 changed files with 13 additions and 29 deletions

View File

@@ -9,8 +9,7 @@ from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices import CompareStatus, DeviceStatus, SubscriptionStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked

View File

@@ -9,16 +9,15 @@ used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import time
from typing import Any, Literal
from typing import Literal
from bec_lib.devicemanager import ScanInfo
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd import DeviceStatus, StatusBase
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError
from pydantic import BaseModel, Field
from typeguard import typechecked

View File

@@ -1,12 +1,11 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal, cast
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase
from ophyd.status import SubscriptionStatus, WaitTimeoutError
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, ProgressSignal, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.sim.sim_signals import SetableSignal

View File

@@ -17,12 +17,10 @@ from ophyd.areadetector.cam import ADBase, PilatusDetectorCam
from ophyd.areadetector.plugins import HDF5Plugin_V22 as HDF5Plugin
from ophyd.areadetector.plugins import ImagePlugin_V22 as ImagePlugin
from ophyd.status import WaitTimeoutError
from ophyd_devices import CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices import AndStatus, CompareStatus, DeviceStatus, FileEventSignal, PreviewSignal
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from pydantic import BaseModel, Field
from debye_bec.devices.pilatus.utils import AndStatusWithList
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import DevicePreviewMessage, ScanStatusMessage
@@ -360,14 +358,12 @@ class Pilatus(PSIDeviceBase, ADBase):
# f"Live Mode on detector {self.name} did not stop: {content} after 10s."
# )
def check_detector_stop_running_acquisition(self) -> AndStatusWithList:
def check_detector_stop_running_acquisition(self) -> AndStatus:
"""Check if the detector is still running an acquisition."""
status_acquire = CompareStatus(self.cam.acquire, ACQUIREMODE.DONE.value)
status_writing = CompareStatus(self.hdf.capture, ACQUIREMODE.DONE.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.UNARMED.value)
status = AndStatusWithList(
device=self, status_list=[status_acquire, status_writing, status_cam_server]
)
status = status_acquire & status_writing & status_cam_server
return status
def _calculate_trigger(self, scan_msg: ScanStatusMessage):
@@ -562,9 +558,7 @@ class Pilatus(PSIDeviceBase, ADBase):
status_hdf = CompareStatus(self.hdf.capture, ACQUIREMODE.ACQUIRING.value)
status_cam = CompareStatus(self.cam.acquire, ACQUIREMODE.ACQUIRING.value)
status_cam_server = CompareStatus(self.cam.armed, DETECTORSTATE.ARMED.value)
status = AndStatusWithList(
device=self, status_list=[status_hdf, status_cam, status_cam_server]
)
status = status_hdf & status_cam & status_cam_server
self.cam.acquire.put(1)
self.hdf.capture.put(1)
self.cancel_on_stop(status)
@@ -627,10 +621,7 @@ class Pilatus(PSIDeviceBase, ADBase):
else:
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status_img_written = CompareStatus(self.hdf.num_captured, self.n_images)
status = AndStatusWithList(
device=self,
status_list=[status_hdf, status_cam, status_img_written, status_cam_server],
)
status = status_hdf & status_cam & status_img_written & status_cam_server
status.add_callback(self._complete_callback) # Callback that writing was successful
self.cancel_on_stop(status)
return status

View File

@@ -10,8 +10,6 @@ from ophyd import EpicsSignal, EpicsSignalRO
from ophyd_devices import CompareStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from debye_bec.devices.pilatus.utils import AndStatusWithList
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
@@ -84,7 +82,7 @@ class PilatusCurtain(PSIDeviceBase):
# TODO timeout ok?
status_open = CompareStatus(self.cover_is_open, COVER.OPEN, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=")
status = AndStatusWithList(device=self, status_list=[status_open, status_error])
status = status_open & status_error
return status
else:
return None
@@ -96,7 +94,7 @@ class PilatusCurtain(PSIDeviceBase):
# TODO timeout ok?
status_close = CompareStatus(self.cover_is_closed, COVER.CLOSED, timeout=5)
status_error = CompareStatus(self.cover_error, COVER.ERROR, operation_success="!=")
status = AndStatusWithList(device=self, status_list=[status_close, status_error])
status = status_close & status_error
return status
else:
return None

View File

@@ -1,10 +1,8 @@
# pylint: skip-file
import os
import threading
from typing import TYPE_CHECKING, Generator
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_lib.messages import ScanStatusMessage