fix: remove DummyDetector import from init; crashes pipeline due to syntax error

This commit is contained in:
2024-08-28 16:09:03 +02:00
parent 5ba10cdff8
commit 66eff8e5f0
2 changed files with 63 additions and 70 deletions

View File

@ -1,2 +1 @@
from .phoenix_trigger import PhoenixTrigger from .phoenix_trigger import PhoenixTrigger
from .dummy_devices import Dummy_PSIDetector

View File

@ -13,26 +13,27 @@ from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter from bec_lib.file_utils import FileWriter
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component, Device, DeviceStatus, Kind from ophyd import Component
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO
from ophyd import FormattedComponent as FCpt
from ophyd import Kind
from ophyd.device import Staged from ophyd.device import Staged
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils import bec_utils from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd import Component as Cpt
from ophyd import FormattedComponent as FCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from phoenix_bec.scripts.phoenix import PhoenixBL from phoenix_bec.scripts.phoenix import PhoenixBL
logger = bec_logger.logger logger = bec_logger.logger
# class LogTime():
#class LogTime():
# def __init__(self): # def __init__(self):
# self.t0=time.time() # self.t0=time.time()
@ -48,8 +49,8 @@ logger = bec_logger.logger
# file.close # file.close
p_s = PhoenixBL.my_log
p_s=PhoenixBL.my_log
class DetectorInitError(Exception): class DetectorInitError(Exception):
"""Raised when initiation of the device class fails, """Raised when initiation of the device class fails,
@ -74,8 +75,10 @@ class SetupDummy(CustomDetectorMixin):
self.parent = parent self.parent = parent
def on_init(self) -> None: def on_init(self) -> None:
""" """ """
def on_stage(self) -> None:e is writing data on disk, this step should include publishing
def on_stage(self) -> None:
"""e is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to. a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT: IMPORTANT:
@ -217,7 +220,7 @@ class SetupDummy(CustomDetectorMixin):
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds timeout (float): timeout in seconds
check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value
rue if stopped flag should be checked rue if stopped flag should be checked
interval (float): interval in seconds interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout exception_on_timeout (Exception): Exception to raise on timeout
@ -310,17 +313,15 @@ class Dummy_PSIDetector(PSIDetectorBase):
custom_prepare_cls = SetupDummy custom_prepare_cls = SetupDummy
#prefix=X07MB-PC-PSCAN # prefix=X07MB-PC-PSCAN
D = Cpt(EpicsSignal, 'P-P0D0') # cont on / off
D = Cpt(EpicsSignal, "P-P0D0") # cont on / off
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
self.p_s=PhoenixBL.my_log #must be before super!!! self.p_s = PhoenixBL.my_log # must be before super!!!
self.p_s('Dummy_device Dummy_PSIDetector.__init__ ') self.p_s("Dummy_device Dummy_PSIDetector.__init__ ")
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
@ -345,28 +346,25 @@ class Dummy_PSIDetector(PSIDetectorBase):
self._update_scaninfo() self._update_scaninfo()
self._update_filewriter() self._update_filewriter()
self._init() self._init()
#.. prepare my own log file # .. prepare my own log file
self.p_s('Dummy_device Dummy_PSIDetector.__init__ .. done ')
self.p_s("Dummy_device Dummy_PSIDetector.__init__ .. done ")
def _update_filewriter(self) -> None: def _update_filewriter(self) -> None:
"""Update filewriter with service config""" """Update filewriter with service config"""
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter') self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter")
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter .. done ') self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter .. done ")
def _update_scaninfo(self) -> None: def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing """Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode This depends on device manager and operation/sim_mode
""" """
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo') self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo")
self.scaninfo = BecScaninfoMixin(self.device_manager) self.scaninfo = BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo .. done ') self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo .. done ")
def _update_service_config(self) -> None: def _update_service_config(self) -> None:
"""Update service config from BEC service config """Update service config from BEC service config
@ -376,32 +374,31 @@ class Dummy_PSIDetector(PSIDetectorBase):
# pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
from bec_lib.bec_service import SERVICE_CONFIG from bec_lib.bec_service import SERVICE_CONFIG
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config')
self.p_s("Dummy_device Dummy_PSIDetector._update_service_config")
if SERVICE_CONFIG: if SERVICE_CONFIG:
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
return return
self.service_cfg = {"base_path": os.path.abspath(".")} self.service_cfg = {"base_path": os.path.abspath(".")}
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config .. done') self.p_s("Dummy_device Dummy_PSIDetector._update_service_config .. done")
def check_scan_id(self) -> None: def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has.""" """Checks if scan_id has changed and set stopped flagged to True if it has."""
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id') self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id")
old_scan_id = self.scaninfo.scan_id old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id: if self.scaninfo.scan_id != old_scan_id:
self.stopped = True self.stopped = True
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id .. done ') self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id .. done ")
def _init(self) -> None: def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters""" """Initialize detector, filewriter and set default parameters"""
self.p_s('Dummy_device Dummy_PSIDetector._init') self.p_s("Dummy_device Dummy_PSIDetector._init")
self.custom_prepare.on_init() self.custom_prepare.on_init()
self.p_s('Dummy_device Dummy_PSIDetector._init ... done ') self.p_s("Dummy_device Dummy_PSIDetector._init ... done ")
def stage(self) -> list[object]: def stage(self) -> list[object]:
""" """
@ -414,15 +411,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
list(object): list of objects that were staged list(object): list of objects that were staged
""" """
self.p_s('Dummy_device Dummy_PSIDetector.stage') self.p_s("Dummy_device Dummy_PSIDetector.stage")
if self._staged != Staged.no: if self._staged != Staged.no:
return super().stage() return super().stage()
self.stopped = False self.stopped = False
self.scaninfo.load_scan_metadata() self.scaninfo.load_scan_metadata()
self.custom_prepare.on_stage() self.custom_prepare.on_stage()
self.p_s('Dummy_device Dummy_PSIDetector.stage done ') self.p_s("Dummy_device Dummy_PSIDetector.stage done ")
return super().stage() return super().stage()
@ -433,22 +429,21 @@ class Dummy_PSIDetector(PSIDetectorBase):
time-critical actions. Therefore, it should also be kept as short/fast as possible. time-critical actions. Therefore, it should also be kept as short/fast as possible.
I.e. Arming a detector in case there is a risk of timing out. I.e. Arming a detector in case there is a risk of timing out.
""" """
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan') self.p_s("Dummy_device Dummy_PSIDetector.pre_scan")
self.custom_prepare.on_pre_scan() self.custom_prepare.on_pre_scan()
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan .. done ') self.p_s("Dummy_device Dummy_PSIDetector.pre_scan .. done ")
def trigger(self) -> DeviceStatus: def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC.""" """Trigger the detector, called from BEC."""
# pylint: disable=assignment-from-no-return # pylint: disable=assignment-from-no-return
self.p_s('Dummy_device Dummy_PSIDetector.trigger') self.p_s("Dummy_device Dummy_PSIDetector.trigger")
status = self.custom_prepare.on_trigger() status = self.custom_prepare.on_trigger()
if isinstance(status, DeviceStatus): if isinstance(status, DeviceStatus):
return status return status
self.p_s('Dummy_device Dummy_PSIDetector.trigger.. done ') self.p_s("Dummy_device Dummy_PSIDetector.trigger.. done ")
return super().trigger() return super().trigger()
@ -461,14 +456,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
Actions are implemented in custom_prepare.on_complete since they are beamline specific. Actions are implemented in custom_prepare.on_complete since they are beamline specific.
""" """
# pylint: disable=assignment-from-no-return # pylint: disable=assignment-from-no-return
self.p_s('Dummy_device Dummy_PSIDetector.complete') self.p_s("Dummy_device Dummy_PSIDetector.complete")
status = self.custom_prepare.on_complete() status = self.custom_prepare.on_complete()
if isinstance(status, DeviceStatus): if isinstance(status, DeviceStatus):
return status return status
status = DeviceStatus(self) status = DeviceStatus(self)
status.set_finished() status.set_finished()
self.p_s('Dummy_device Dummy_PSIDetector.complete ... done ') self.p_s("Dummy_device Dummy_PSIDetector.complete ... done ")
return status return status
@ -484,11 +479,11 @@ class Dummy_PSIDetector(PSIDetectorBase):
Returns: Returns:
list(object): list of objects that were unstaged list(object): list of objects that were unstaged
""" """
self.p_s('Dummy_device Dummy_PSIDetector.unstage') self.p_s("Dummy_device Dummy_PSIDetector.unstage")
self.check_scan_id() self.check_scan_id()
self.custom_prepare.on_unstage() self.custom_prepare.on_unstage()
self.stopped = False self.stopped = False
self.p_s('Dummy_device Dummy_PSIDetector.unstage .. done') self.p_s("Dummy_device Dummy_PSIDetector.unstage .. done")
return super().unstage() return super().unstage()
@ -497,9 +492,8 @@ class Dummy_PSIDetector(PSIDetectorBase):
Stop the scan, with camera and file writer Stop the scan, with camera and file writer
""" """
self.p_s('Dummy_device Dummy_PSIDetector.stop') self.p_s("Dummy_device Dummy_PSIDetector.stop")
self.custom_prepare.on_stop() self.custom_prepare.on_stop()
super().stop(success=success) super().stop(success=success)
self.stopped = True self.stopped = True
self.p_s('Dummy_device Dummy_PSIDetector.stop ... done') self.p_s("Dummy_device Dummy_PSIDetector.stop ... done")