diff --git a/phoenix_bec/devices/__init__.py b/phoenix_bec/devices/__init__.py index 0893a2c..baf95ee 100644 --- a/phoenix_bec/devices/__init__.py +++ b/phoenix_bec/devices/__init__.py @@ -1,2 +1 @@ from .phoenix_trigger import PhoenixTrigger -from .dummy_devices import Dummy_PSIDetector diff --git a/phoenix_bec/devices/dummy_devices.py b/phoenix_bec/devices/dummy_devices.py index fc952cf..05bcb79 100644 --- a/phoenix_bec/devices/dummy_devices.py +++ b/phoenix_bec/devices/dummy_devices.py @@ -13,26 +13,27 @@ from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.file_utils import FileWriter from bec_lib.logger import bec_logger -from ophyd import Component, Device, DeviceStatus, Kind +from ophyd import Component +from ophyd import Component as Cpt +from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO +from ophyd import FormattedComponent as FCpt +from ophyd import Kind from ophyd.device import Staged - +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) from ophyd_devices.sim.sim_signals import SetableSignal from ophyd_devices.utils import bec_utils from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError -from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin - -from ophyd import Component as Cpt -from ophyd import FormattedComponent as FCpt -from ophyd import Device, EpicsSignal, EpicsSignalRO from phoenix_bec.scripts.phoenix import PhoenixBL logger = bec_logger.logger - -#class LogTime(): +# class LogTime(): # def __init__(self): # self.t0=time.time() @@ -48,8 +49,8 @@ logger = bec_logger.logger # file.close +p_s = PhoenixBL.my_log -p_s=PhoenixBL.my_log class DetectorInitError(Exception): """Raised when initiation of the device class fails, @@ -74,8 +75,10 @@ class SetupDummy(CustomDetectorMixin): self.parent = parent def on_init(self) -> None: - """ - def on_stage(self) -> None:e is writing data on disk, this step should include publishing + """ """ + + def on_stage(self) -> None: + """e is writing data on disk, this step should include publishing a file_event and file_message to BEC to inform the system where the data is written to. IMPORTANT: @@ -203,27 +206,27 @@ class SetupDummy(CustomDetectorMixin): exception_on_timeout: Exception = None, ) -> DeviceStatus: """Utility function to wait for signals in a thread. - Returns a DevicesStatus object that resolves either to set_finished or set_exception. - The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. + Returns a DevicesStatus object that resolves either to set_finished or set_exception. + The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. - Usage: - This function should be used to wait for signals to reach a certain condition, especially in the context of - on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. - It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, - the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. - The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. + Usage: + This function should be used to wait for signals to reach a certain condition, especially in the context of + on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. + It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, + the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. + The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. - Args: - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check - timeout (float): timeout in seconds - check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value -rue if stopped flag should be checked - interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True - exception_on_timeout (Exception): Exception to raise on timeout + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value + rue if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + exception_on_timeout (Exception): Exception to raise on timeout - Returns: - DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception + Returns: + DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception """ if exception_on_timeout is None: exception_on_timeout = DeviceTimeoutError( @@ -308,19 +311,17 @@ class Dummy_PSIDetector(PSIDetectorBase): filepath = Component(SetableSignal, value="", kind=Kind.config) - custom_prepare_cls = SetupDummy + custom_prepare_cls = SetupDummy - #prefix=X07MB-PC-PSCAN - - - D = Cpt(EpicsSignal, 'P-P0D0') # cont on / off + # prefix=X07MB-PC-PSCAN + D = Cpt(EpicsSignal, "P-P0D0") # cont on / off def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): - self.p_s=PhoenixBL.my_log #must be before super!!! + self.p_s = PhoenixBL.my_log # must be before super!!! - self.p_s('Dummy_device Dummy_PSIDetector.__init__ ') + self.p_s("Dummy_device Dummy_PSIDetector.__init__ ") super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) @@ -345,28 +346,25 @@ class Dummy_PSIDetector(PSIDetectorBase): self._update_scaninfo() self._update_filewriter() self._init() - #.. prepare my own log file - - self.p_s('Dummy_device Dummy_PSIDetector.__init__ .. done ') + # .. prepare my own log file + self.p_s("Dummy_device Dummy_PSIDetector.__init__ .. done ") def _update_filewriter(self) -> None: """Update filewriter with service config""" - self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter') + self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter") self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) - self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter .. done ') - - + self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter .. done ") def _update_scaninfo(self) -> None: """Update scaninfo from BecScaninfoMixing This depends on device manager and operation/sim_mode """ - self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo') + self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo") self.scaninfo = BecScaninfoMixin(self.device_manager) self.scaninfo.load_scan_metadata() - self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo .. done ') + self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo .. done ") def _update_service_config(self) -> None: """Update service config from BEC service config @@ -376,32 +374,31 @@ class Dummy_PSIDetector(PSIDetectorBase): # pylint: disable=import-outside-toplevel from bec_lib.bec_service import SERVICE_CONFIG - self.p_s('Dummy_device Dummy_PSIDetector._update_service_config') + + self.p_s("Dummy_device Dummy_PSIDetector._update_service_config") if SERVICE_CONFIG: self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] return self.service_cfg = {"base_path": os.path.abspath(".")} - self.p_s('Dummy_device Dummy_PSIDetector._update_service_config .. done') + self.p_s("Dummy_device Dummy_PSIDetector._update_service_config .. done") def check_scan_id(self) -> None: """Checks if scan_id has changed and set stopped flagged to True if it has.""" - self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id') + self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id") old_scan_id = self.scaninfo.scan_id self.scaninfo.load_scan_metadata() if self.scaninfo.scan_id != old_scan_id: self.stopped = True - self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id .. done ') - + self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id .. done ") def _init(self) -> None: """Initialize detector, filewriter and set default parameters""" - self.p_s('Dummy_device Dummy_PSIDetector._init') + self.p_s("Dummy_device Dummy_PSIDetector._init") self.custom_prepare.on_init() - self.p_s('Dummy_device Dummy_PSIDetector._init ... done ') - + self.p_s("Dummy_device Dummy_PSIDetector._init ... done ") def stage(self) -> list[object]: """ @@ -414,15 +411,14 @@ class Dummy_PSIDetector(PSIDetectorBase): list(object): list of objects that were staged """ - self.p_s('Dummy_device Dummy_PSIDetector.stage') + self.p_s("Dummy_device Dummy_PSIDetector.stage") if self._staged != Staged.no: return super().stage() self.stopped = False self.scaninfo.load_scan_metadata() self.custom_prepare.on_stage() - self.p_s('Dummy_device Dummy_PSIDetector.stage done ') - + self.p_s("Dummy_device Dummy_PSIDetector.stage done ") return super().stage() @@ -433,22 +429,21 @@ class Dummy_PSIDetector(PSIDetectorBase): time-critical actions. Therefore, it should also be kept as short/fast as possible. I.e. Arming a detector in case there is a risk of timing out. """ - self.p_s('Dummy_device Dummy_PSIDetector.pre_scan') + self.p_s("Dummy_device Dummy_PSIDetector.pre_scan") self.custom_prepare.on_pre_scan() - self.p_s('Dummy_device Dummy_PSIDetector.pre_scan .. done ') - + self.p_s("Dummy_device Dummy_PSIDetector.pre_scan .. done ") def trigger(self) -> DeviceStatus: """Trigger the detector, called from BEC.""" # pylint: disable=assignment-from-no-return - self.p_s('Dummy_device Dummy_PSIDetector.trigger') + self.p_s("Dummy_device Dummy_PSIDetector.trigger") status = self.custom_prepare.on_trigger() if isinstance(status, DeviceStatus): return status - self.p_s('Dummy_device Dummy_PSIDetector.trigger.. done ') + self.p_s("Dummy_device Dummy_PSIDetector.trigger.. done ") return super().trigger() @@ -461,14 +456,14 @@ class Dummy_PSIDetector(PSIDetectorBase): Actions are implemented in custom_prepare.on_complete since they are beamline specific. """ # pylint: disable=assignment-from-no-return - self.p_s('Dummy_device Dummy_PSIDetector.complete') + self.p_s("Dummy_device Dummy_PSIDetector.complete") status = self.custom_prepare.on_complete() if isinstance(status, DeviceStatus): return status status = DeviceStatus(self) status.set_finished() - self.p_s('Dummy_device Dummy_PSIDetector.complete ... done ') + self.p_s("Dummy_device Dummy_PSIDetector.complete ... done ") return status @@ -484,11 +479,11 @@ class Dummy_PSIDetector(PSIDetectorBase): Returns: list(object): list of objects that were unstaged """ - self.p_s('Dummy_device Dummy_PSIDetector.unstage') + self.p_s("Dummy_device Dummy_PSIDetector.unstage") self.check_scan_id() self.custom_prepare.on_unstage() self.stopped = False - self.p_s('Dummy_device Dummy_PSIDetector.unstage .. done') + self.p_s("Dummy_device Dummy_PSIDetector.unstage .. done") return super().unstage() @@ -497,9 +492,8 @@ class Dummy_PSIDetector(PSIDetectorBase): Stop the scan, with camera and file writer """ - self.p_s('Dummy_device Dummy_PSIDetector.stop') + self.p_s("Dummy_device Dummy_PSIDetector.stop") self.custom_prepare.on_stop() super().stop(success=success) self.stopped = True - self.p_s('Dummy_device Dummy_PSIDetector.stop ... done') - + self.p_s("Dummy_device Dummy_PSIDetector.stop ... done") diff --git a/phoenix_bec/devices/phoenix_trigger.py b/phoenix_bec/devices/phoenix_trigger.py index 06783cd..d4fe1c7 100644 --- a/phoenix_bec/devices/phoenix_trigger.py +++ b/phoenix_bec/devices/phoenix_trigger.py @@ -1,18 +1,18 @@ -import time +""" Module for the PhoenixTrigger class to connect to the ADC card +that creates TTL signals to trigger cameras and detectors at Phoenix. """ import enum -import numpy as np +import time +import numpy as np +from bec_lib import bec_logger from ophyd import Component as Cpt from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind - from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - PSIDetectorBase, CustomDetectorMixin, + PSIDetectorBase, ) -from bec_lib import bec_logger - logger = bec_logger.logger DETECTOR_TIMEOUT = 5 @@ -22,7 +22,7 @@ class PhoenixTriggerError(Exception): """PhoenixTrigger specific error""" -class SAMPLINGDONE(int, enum.Enum): +class SAMPLING(int, enum.Enum): """Sampling Done PV""" RUNNING = 0 @@ -31,13 +31,11 @@ class SAMPLINGDONE(int, enum.Enum): class PhoenixTriggerSetup(CustomDetectorMixin): """ - This defines the PHOENIX trigger setup. - - + Mixin Class to setup the PhoenixTrigger device """ def on_stage(self) -> None: - """Actions to take place on stage""" + """On stage actions which are executed upon staging the device""" if self.parent.scaninfo.scan_type == "step": self.parent.start_csmpl.set(0) self.parent.total_cycles.set(1) @@ -47,19 +45,21 @@ class PhoenixTriggerSetup(CustomDetectorMixin): logger.info(f"Device {self.parent.name} was staged for step scan") def on_unstage(self) -> None: - """Actions to take place on unstage""" + """On unstage actions which are executed upon unstaging the device""" self.on_stop() def on_trigger(self) -> DeviceStatus: - """Actions to be performed upon receiving a software trigger""" + """On trigger actions which are executed upon triggering the device""" + # TODO Test the proper check for the falcon state # Check first that falcon is set to acquiring falcon = self.parent.device_manager.devices.get("falcon_nohdf5", None) + timeout = 1 if falcon is not None: - if self.wait_for_signals([(falcon.state.get, 1)], timeout=1): + # TODO Check that falcon.state.get() == 1 is the correct check. --> When is the falcon acquiring, this assumes 1? + if not self.wait_for_signals([(falcon.state.get, 1)], timeout=timeout): raise PhoenixTriggerError( - f"Falcon not ready to take trigger after 1s timeout in trigger" + f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s" ) - falcon.state.get() == 1 # Acquiring if self.parent.scaninfo.scan_type == "step": time.sleep(0.2) self.parent.smpl.put(1) @@ -67,7 +67,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin): time.sleep(0.2) # Trigger function from ophyd.Device returns a DeviceStatus. This function # starts a process that creates a DeviceStatus, and waits for the signal_conditions - # self.parent.smpl_done.get to change to the value SAMPLINGDONE.DONE + # self.parent.smpl_done.get to change to the value SAMPLING.DONE # Once this takes place, the DeviceStatus.done flag will be set to True. # When BEC calls trigger() on the devices, this method will be called assuming that # the devices config softwareTrigger=True is set. @@ -75,7 +75,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin): # self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) # which ensures that the DeviceStatus object resolves before continuing, i.e. DeviceStatus.done = True status = self.wait_with_status( - signal_conditions=[(self.parent.smpl_done.get, SAMPLINGDONE.DONE)], + signal_conditions=[(self.parent.smpl_done.get, SAMPLING.DONE)], timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate check_stopped=True, ) @@ -83,24 +83,23 @@ class PhoenixTriggerSetup(CustomDetectorMixin): def on_stop(self) -> None: """Actions to stop the Device""" - # Put the Device in cont mode + # Put the Device again in continous acquisition mode self.parent.total_cycles.set(5) self.parent.start_csmpl.set(1) self.parent.smpl.put(1) time.sleep(0.5) self.parent.smpl.put(1) time.sleep(0.2) - if self.parent.smpl_done.get() == SAMPLINGDONE.RUNNING: + if self.parent.smpl_done.get() == SAMPLING.RUNNING: return self.parent.smpl.put(1) class PhoenixTrigger(PSIDetectorBase): """ - Docstring: - - Class for PHOENIX TTL hardware trigger (X07MB-OP2:) + Class for PHOENIX TTL hardware trigger: 'X07MB-OP2:' + This device is used to trigger communicate with an ADC card that creates TTL signals to trigger cameras and detectors at Phoenix. """ custom_prepare_cls = PhoenixTriggerSetup @@ -123,15 +122,13 @@ class PhoenixTrigger(PSIDetectorBase): if __name__ == "__main__": + # Test the PhoenixTrigger class trigger = PhoenixTrigger(name="trigger", prefix="X07MB-OP2:") trigger.wait_for_connection(all_signals=True) trigger.read() trigger.read_configuration() trigger.stage() - status = trigger.trigger() - while status.done is False: - print(f" Waiting for status, flag is {status.done}") - time.sleep(0.2) - + device_status = trigger.trigger() + device_status.wait() trigger.unstage() diff --git a/phoenix_bec/scans/phoenix_scans.py b/phoenix_bec/scans/phoenix_scans.py index dbbad2e..578772c 100644 --- a/phoenix_bec/scans/phoenix_scans.py +++ b/phoenix_bec/scans/phoenix_scans.py @@ -23,90 +23,87 @@ but they are executed in a specific order: """ # imports in ScanBase -#from __future__ import annotations +# from __future__ import annotations -#import ast -#import enum -#import threading -#import time -#import uuid -#from abc import ABC, abstractmethod -#from typing import Any, Literal +# import ast +# import enum +# import threading +# import time +# import uuid +# from abc import ABC, abstractmethod +# from typing import Any, Literal -#import numpy as np +# import numpy as np -#from bec_lib.device import DeviceBase -#from bec_lib.devicemanager import DeviceManagerBase -#from bec_lib.endpoints import MessageEndpoints -#from bec_lib.logger import bec_logger +# from bec_lib.device import DeviceBase +# from bec_lib.devicemanager import DeviceManagerBase +# from bec_lib.endpoints import MessageEndpoints +# from bec_lib.logger import bec_logger -#from .errors import LimitError, ScanAbortion -#from .path_optimization import PathOptimizerMixin -#from .scan_stubs import ScanStubs +# from .errors import LimitError, ScanAbortion +# from .path_optimization import PathOptimizerMixin +# from .scan_stubs import ScanStubs # end imports in ScanBase # import time # import numpy as np +import time + +import numpy as np + # from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger +from bec_server.scan_server.scans import ScanArgType, ScanBase + +from phoenix_bec.scripts.phoenix import PhoenixBL + # from bec_lib import messages # from bec_server.scan_server.errors import ScanAbortion # from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase # logger = bec_logger.logger -from bec_server.scan_server.scans import ScanBase, ScanArgType -import numpy as np -import time -from bec_lib.logger import bec_logger -from phoenix_bec.scripts.phoenix import PhoenixBL - logger = bec_logger.logger -class LogTime(): +class LogTime: def __init__(self): - logger.success('init LogTime') - self.t0=time.time() + logger.success("init LogTime") + self.t0 = time.time() - def p_s(self,x): - now=time.time() - #delta=now-self.t0 - m=str(now)+' sec '+x - logger.success(m)custom_prepare_cls(parent=self, **kwargs) - # making the instance of PSID - #self.t0=now - file=open('MyLogfile.txt','a') - file.write(m+'\n') + def p_s(self, x): + now = time.time() + # delta=now-self.t0 + m = str(now) + " sec " + x + logger.success(m) + # making the instance of PSID + # self.t0=now + file = open("MyLogfile.txt", "a") + file.write(m + "\n") file.close - - - class PhoenixScanBaseTTL(ScanBase): """ Base scan cl p_s('init scrips.phoenix.scans.PhoenixLineScan') """ - - def scan_core(self): """perform the scan core procedure""" - self.p_s('PhoenixScanBaseTT.scan_core') + self.p_s("PhoenixScanBaseTT.scan_core") for ind, pos in self._get_position(): for self.burst_index in range(self.burst_at_each_point): - self.p_s('PhoenixScanBaseTT.scan_core in loop ') + self.p_s("PhoenixScanBaseTT.scan_core in loop ") yield from self._at_each_point(ind, pos) self.burst_index = 0 def _at_each_point(self, ind=None, pos=None): - self.p_s('PhoenixScanBaseTT._at_each_point') + self.p_s("PhoenixScanBaseTT._at_each_point") yield from self._move_scan_motors_and_wait(pos) if ind > 0: yield from self.stubs.wait( @@ -123,12 +120,11 @@ class PhoenixScanBaseTTL(ScanBase): ) self.point_id += 1 - self.p_s('done') + self.p_s("done") + class PhoenixLineScan(PhoenixScanBaseTTL): - - scan_name = "phoenix_line_scan" required_kwargs = ["steps", "relative"] arg_input = { @@ -149,25 +145,25 @@ class PhoenixLineScan(PhoenixScanBaseTTL): steps: int = None, relative: bool = False, burst_at_each_point: int = 1, - setup_device:str= None, + setup_device: str = None, **kwargs, ): """ - A phoenix line scan for one or more motors. + A phoenix line scan for one or more motors. - Args: - *args (Device, float, float): pairs of device / start position / end position - exp_time (float): exposure time in s. Default: 0 - steps (int): number of steps. Default: 10 - relative (bool): if True, the start and end positions are relative to the current position. Default: False - burst_a Specifies the level of type checking analysis to perform. -ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True) + Args: + *args (Device, float, float): pairs of device / start position / end position + exp_time (float): exposure time in s. Default: 0 + steps (int): number of steps. Default: 10 + relative (bool): if True, the start and end positions are relative to the current position. Default: False + burst_a Specifies the level of type checking analysis to perform. + ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True) """ - #from phoenix_bec.scripts.phoenix import PhoenixBL - self.p_s=PhoenixBL.my_log + # from phoenix_bec.scripts.phoenix import PhoenixBL + self.p_s = PhoenixBL.my_log - self.p_s('init scripts.phoenix.scans.PhoenixLineScan') + self.p_s("init scripts.phoenix.scans.PhoenixLineScan") super().__init__( exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs @@ -176,14 +172,13 @@ ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, rela self.setup_device = setup_device time.sleep(1) - self.p_s('done') + self.p_s("done") def _calculate_positions(self) -> None: - self.p_s('PhoenixLineScan._calculate_positions') + self.p_s("PhoenixLineScan._calculate_positions") axis = [] for _, val in self.caller_args.items(): ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float) axis.append(ax_pos) self.positions = np.array(list(zip(*axis)), dtype=float) - self.p_s('done') - + self.p_s("done") diff --git a/tests/tests_devices/test_phoenix_trigger.py b/tests/tests_devices/test_phoenix_trigger.py new file mode 100644 index 0000000..891ea0c --- /dev/null +++ b/tests/tests_devices/test_phoenix_trigger.py @@ -0,0 +1,97 @@ +import threading +import time +from unittest import mock + +import numpy as np +import ophyd +import pytest +from bec_server.device_server.tests.utils import DMMock +from ophyd_devices.interfaces.base_classes.psi_detector_base import DeviceTimeoutError +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + +from phoenix_bec.devices.phoenix_trigger import SAMPLING, PhoenixTrigger + + +@pytest.fixture(scope="function") +def mock_trigger(): + name = "phoenix_trigger" + prefix = "X07MB-OP2:" + dm = DMMock() + with mock.patch.object(dm, "connector"): + with ( + mock.patch( + "ophyd_devices.interfaces.base_classes.psi_detector_base.FileWriter" + ) as filemixin, + mock.patch( + "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" + ) as mock_service_config, + ): + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + with mock.patch.object(PhoenixTrigger, "_init"): + det = PhoenixTrigger(name=name, prefix=prefix, device_manager=dm) + patch_dual_pvs(det) + det.TIMEOUT_FOR_SIGNALS = 0.1 + yield det + + +def test_phoenix_trigger_init(mock_trigger): + """Test PhoenixTrigger init""" + assert mock_trigger.name == "phoenix_trigger" + assert mock_trigger.prefix == "X07MB-OP2:" + + +def test_phoenix_trigger_stage(mock_trigger): + """Test PhoenixTrigger on_stage""" + with mock.patch.object(mock_trigger.scaninfo, "load_scan_metadata") as mock_load_scan_metadata: + mock_trigger.scaninfo.scan_type = "step" + mock_trigger.scaninfo.exp_time = exp_time = 1 + mock_trigger.stage() + assert mock_load_scan_metadata.call_count == 1 + assert mock_trigger.start_csmpl.get() == 0 + assert mock_trigger.total_cycles.get() == np.ceil(exp_time * 5) + assert mock_trigger.smpl.get() == 1 + + +def test_phoenix_trigger_unstage(mock_trigger): + """Test PhoenixTrigger on_unstage""" + with mock.patch.object(mock_trigger.custom_prepare, "on_stop") as mock_on_stop: + mock_trigger.unstage() + assert mock_on_stop.call_count == 1 + + +def test_phoenix_trigger_stop(mock_trigger): + """Test PhoenixTrigger on_stop""" + with mock.patch.object(mock_trigger.smpl, "put") as mock_smpl_put: + mock_trigger.smpl_done._read_pv.mock_data = SAMPLING.RUNNING + mock_trigger.stop() + assert mock_trigger.stopped is True + assert mock_trigger.total_cycles.get() == 5 + assert mock_trigger.start_csmpl.get() == 1 + assert mock_smpl_put.call_args_list == [mock.call(1), mock.call(1)] + + +def test_phoenix_trigger_trigger(mock_trigger): + """Test PhoenixTrigger on_trigger + + First test that the trigger timeouts due to readback from smpl_done not being done. + Afterwards, check that status object resolved correctly if smpl_done is done. + """ + exp_time = 0.05 + mock_trigger.device_manager.add_device("falcon_nohdf5") + falcon_state = mock_trigger.device_manager.devices.falcon_nohdf5.state = mock.MagicMock() + falcon_state.get = mock.MagicMock(return_value=1) + mock_trigger.scaninfo.scan_type = "step" + mock_trigger.scaninfo.exp_time = exp_time + + with mock.patch.object( + mock_trigger.custom_prepare, "wait_with_status", return_value=mock.MagicMock() + ) as mock_wait_with_status: + status = mock_trigger.trigger() + assert mock_wait_with_status.call_count == 1 + assert mock_wait_with_status.call_args[1]["signal_conditions"] == [ + (mock_trigger.smpl_done.get, SAMPLING.DONE) + ] + assert mock_wait_with_status.call_args[1]["timeout"] == 5 * exp_time + assert mock_wait_with_status.call_args[1]["check_stopped"] is True