From 66eff8e5f0e03ac5117d8474ede8612b8ae5cc1f Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 28 Aug 2024 16:09:03 +0200 Subject: [PATCH] fix: remove DummyDetector import from init; crashes pipeline due to syntax error --- phoenix_bec/devices/__init__.py | 1 - phoenix_bec/devices/dummy_devices.py | 132 +++++++++++++-------------- 2 files changed, 63 insertions(+), 70 deletions(-) diff --git a/phoenix_bec/devices/__init__.py b/phoenix_bec/devices/__init__.py index 0893a2c..baf95ee 100644 --- a/phoenix_bec/devices/__init__.py +++ b/phoenix_bec/devices/__init__.py @@ -1,2 +1 @@ from .phoenix_trigger import PhoenixTrigger -from .dummy_devices import Dummy_PSIDetector diff --git a/phoenix_bec/devices/dummy_devices.py b/phoenix_bec/devices/dummy_devices.py index fc952cf..05bcb79 100644 --- a/phoenix_bec/devices/dummy_devices.py +++ b/phoenix_bec/devices/dummy_devices.py @@ -13,26 +13,27 @@ from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.file_utils import FileWriter from bec_lib.logger import bec_logger -from ophyd import Component, Device, DeviceStatus, Kind +from ophyd import Component +from ophyd import Component as Cpt +from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO +from ophyd import FormattedComponent as FCpt +from ophyd import Kind from ophyd.device import Staged - +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.utils import bec_utils from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError -from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin - -from ophyd import Component as Cpt -from ophyd import FormattedComponent as FCpt -from ophyd import Device, EpicsSignal, EpicsSignalRO from phoenix_bec.scripts.phoenix import PhoenixBL logger = bec_logger.logger - -#class LogTime(): +# class LogTime(): # def __init__(self): # self.t0=time.time() @@ -48,8 +49,8 @@ logger = bec_logger.logger # file.close +p_s = PhoenixBL.my_log -p_s=PhoenixBL.my_log class DetectorInitError(Exception): """Raised when initiation of the device class fails, @@ -74,8 +75,10 @@ class SetupDummy(CustomDetectorMixin): self.parent = parent def on_init(self) -> None: - """ - def on_stage(self) -> None:e is writing data on disk, this step should include publishing + """ """ + + def on_stage(self) -> None: + """e is writing data on disk, this step should include publishing a file_event and file_message to BEC to inform the system where the data is written to. IMPORTANT: @@ -203,27 +206,27 @@ class SetupDummy(CustomDetectorMixin): exception_on_timeout: Exception = None, ) -> DeviceStatus: """Utility function to wait for signals in a thread. - Returns a DevicesStatus object that resolves either to set_finished or set_exception. - The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. + Returns a DevicesStatus object that resolves either to set_finished or set_exception. + The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. - Usage: - This function should be used to wait for signals to reach a certain condition, especially in the context of - on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. - It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, - the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. - The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. + Usage: + This function should be used to wait for signals to reach a certain condition, especially in the context of + on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. + It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, + the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. + The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. - Args: - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check - timeout (float): timeout in seconds - check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value -rue if stopped flag should be checked - interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True - exception_on_timeout (Exception): Exception to raise on timeout + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value + rue if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + exception_on_timeout (Exception): Exception to raise on timeout - Returns: - DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception + Returns: + DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception """ if exception_on_timeout is None: exception_on_timeout = DeviceTimeoutError( @@ -308,19 +311,17 @@ class Dummy_PSIDetector(PSIDetectorBase): filepath = Component(SetableSignal, value="", kind=Kind.config) - custom_prepare_cls = SetupDummy + custom_prepare_cls = SetupDummy - #prefix=X07MB-PC-PSCAN - - - D = Cpt(EpicsSignal, 'P-P0D0') # cont on / off + # prefix=X07MB-PC-PSCAN + D = Cpt(EpicsSignal, "P-P0D0") # cont on / off def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): - self.p_s=PhoenixBL.my_log #must be before super!!! + self.p_s = PhoenixBL.my_log # must be before super!!! - self.p_s('Dummy_device Dummy_PSIDetector.__init__ ') + self.p_s("Dummy_device Dummy_PSIDetector.__init__ ") super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) @@ -345,28 +346,25 @@ class Dummy_PSIDetector(PSIDetectorBase): self._update_scaninfo() self._update_filewriter() self._init() - #.. prepare my own log file - - self.p_s('Dummy_device Dummy_PSIDetector.__init__ .. done ') + # .. prepare my own log file + self.p_s("Dummy_device Dummy_PSIDetector.__init__ .. done ") def _update_filewriter(self) -> None: """Update filewriter with service config""" - self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter') + self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter") self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) - self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter .. done ') - - + self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter .. done ") def _update_scaninfo(self) -> None: """Update scaninfo from BecScaninfoMixing This depends on device manager and operation/sim_mode """ - self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo') + self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo") self.scaninfo = BecScaninfoMixin(self.device_manager) self.scaninfo.load_scan_metadata() - self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo .. done ') + self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo .. done ") def _update_service_config(self) -> None: """Update service config from BEC service config @@ -376,32 +374,31 @@ class Dummy_PSIDetector(PSIDetectorBase): # pylint: disable=import-outside-toplevel from bec_lib.bec_service import SERVICE_CONFIG - self.p_s('Dummy_device Dummy_PSIDetector._update_service_config') + + self.p_s("Dummy_device Dummy_PSIDetector._update_service_config") if SERVICE_CONFIG: self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] return self.service_cfg = {"base_path": os.path.abspath(".")} - self.p_s('Dummy_device Dummy_PSIDetector._update_service_config .. done') + self.p_s("Dummy_device Dummy_PSIDetector._update_service_config .. done") def check_scan_id(self) -> None: """Checks if scan_id has changed and set stopped flagged to True if it has.""" - self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id') + self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id") old_scan_id = self.scaninfo.scan_id self.scaninfo.load_scan_metadata() if self.scaninfo.scan_id != old_scan_id: self.stopped = True - self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id .. done ') - + self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id .. done ") def _init(self) -> None: """Initialize detector, filewriter and set default parameters""" - self.p_s('Dummy_device Dummy_PSIDetector._init') + self.p_s("Dummy_device Dummy_PSIDetector._init") self.custom_prepare.on_init() - self.p_s('Dummy_device Dummy_PSIDetector._init ... done ') - + self.p_s("Dummy_device Dummy_PSIDetector._init ... done ") def stage(self) -> list[object]: """ @@ -414,15 +411,14 @@ class Dummy_PSIDetector(PSIDetectorBase): list(object): list of objects that were staged """ - self.p_s('Dummy_device Dummy_PSIDetector.stage') + self.p_s("Dummy_device Dummy_PSIDetector.stage") if self._staged != Staged.no: return super().stage() self.stopped = False self.scaninfo.load_scan_metadata() self.custom_prepare.on_stage() - self.p_s('Dummy_device Dummy_PSIDetector.stage done ') - + self.p_s("Dummy_device Dummy_PSIDetector.stage done ") return super().stage() @@ -433,22 +429,21 @@ class Dummy_PSIDetector(PSIDetectorBase): time-critical actions. Therefore, it should also be kept as short/fast as possible. I.e. Arming a detector in case there is a risk of timing out. """ - self.p_s('Dummy_device Dummy_PSIDetector.pre_scan') + self.p_s("Dummy_device Dummy_PSIDetector.pre_scan") self.custom_prepare.on_pre_scan() - self.p_s('Dummy_device Dummy_PSIDetector.pre_scan .. done ') - + self.p_s("Dummy_device Dummy_PSIDetector.pre_scan .. done ") def trigger(self) -> DeviceStatus: """Trigger the detector, called from BEC.""" # pylint: disable=assignment-from-no-return - self.p_s('Dummy_device Dummy_PSIDetector.trigger') + self.p_s("Dummy_device Dummy_PSIDetector.trigger") status = self.custom_prepare.on_trigger() if isinstance(status, DeviceStatus): return status - self.p_s('Dummy_device Dummy_PSIDetector.trigger.. done ') + self.p_s("Dummy_device Dummy_PSIDetector.trigger.. done ") return super().trigger() @@ -461,14 +456,14 @@ class Dummy_PSIDetector(PSIDetectorBase): Actions are implemented in custom_prepare.on_complete since they are beamline specific. """ # pylint: disable=assignment-from-no-return - self.p_s('Dummy_device Dummy_PSIDetector.complete') + self.p_s("Dummy_device Dummy_PSIDetector.complete") status = self.custom_prepare.on_complete() if isinstance(status, DeviceStatus): return status status = DeviceStatus(self) status.set_finished() - self.p_s('Dummy_device Dummy_PSIDetector.complete ... done ') + self.p_s("Dummy_device Dummy_PSIDetector.complete ... done ") return status @@ -484,11 +479,11 @@ class Dummy_PSIDetector(PSIDetectorBase): Returns: list(object): list of objects that were unstaged """ - self.p_s('Dummy_device Dummy_PSIDetector.unstage') + self.p_s("Dummy_device Dummy_PSIDetector.unstage") self.check_scan_id() self.custom_prepare.on_unstage() self.stopped = False - self.p_s('Dummy_device Dummy_PSIDetector.unstage .. done') + self.p_s("Dummy_device Dummy_PSIDetector.unstage .. done") return super().unstage() @@ -497,9 +492,8 @@ class Dummy_PSIDetector(PSIDetectorBase): Stop the scan, with camera and file writer """ - self.p_s('Dummy_device Dummy_PSIDetector.stop') + self.p_s("Dummy_device Dummy_PSIDetector.stop") self.custom_prepare.on_stop() super().stop(success=success) self.stopped = True - self.p_s('Dummy_device Dummy_PSIDetector.stop ... done') - + self.p_s("Dummy_device Dummy_PSIDetector.stop ... done")