Feature/phoenix trigger #4

Merged
appel_c merged 4 commits from feature/phoenix_trigger into main 2024-08-28 16:54:36 +02:00
5 changed files with 241 additions and 159 deletions

View File

@@ -1,2 +1 @@
from .phoenix_trigger import PhoenixTrigger
from .dummy_devices import Dummy_PSIDetector

View File

@@ -13,26 +13,27 @@ from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter
from bec_lib.logger import bec_logger
from ophyd import Component, Device, DeviceStatus, Kind
from ophyd import Component
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO
from ophyd import FormattedComponent as FCpt
from ophyd import Kind
from ophyd.device import Staged
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_signals import SetableSignal
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from ophyd import Component as Cpt
from ophyd import FormattedComponent as FCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from phoenix_bec.scripts.phoenix import PhoenixBL
logger = bec_logger.logger
#class LogTime():
# class LogTime():
# def __init__(self):
# self.t0=time.time()
@@ -48,8 +49,8 @@ logger = bec_logger.logger
# file.close
p_s = PhoenixBL.my_log
p_s=PhoenixBL.my_log
class DetectorInitError(Exception):
"""Raised when initiation of the device class fails,
@@ -74,8 +75,10 @@ class SetupDummy(CustomDetectorMixin):
self.parent = parent
def on_init(self) -> None:
"""
def on_stage(self) -> None:e is writing data on disk, this step should include publishing
""" """
def on_stage(self) -> None:
"""e is writing data on disk, this step should include publishing
a file_event and file_message to BEC to inform the system where the data is written to.
IMPORTANT:
@@ -203,27 +206,27 @@ class SetupDummy(CustomDetectorMixin):
exception_on_timeout: Exception = None,
) -> DeviceStatus:
"""Utility function to wait for signals in a thread.
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
Usage:
This function should be used to wait for signals to reach a certain condition, especially in the context of
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
Usage:
This function should be used to wait for signals to reach a certain condition, especially in the context of
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value
rue if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value
rue if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
Returns:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
Returns:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
"""
if exception_on_timeout is None:
exception_on_timeout = DeviceTimeoutError(
@@ -308,19 +311,17 @@ class Dummy_PSIDetector(PSIDetectorBase):
filepath = Component(SetableSignal, value="", kind=Kind.config)
custom_prepare_cls = SetupDummy
custom_prepare_cls = SetupDummy
#prefix=X07MB-PC-PSCAN
D = Cpt(EpicsSignal, 'P-P0D0') # cont on / off
# prefix=X07MB-PC-PSCAN
D = Cpt(EpicsSignal, "P-P0D0") # cont on / off
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
self.p_s=PhoenixBL.my_log #must be before super!!!
self.p_s = PhoenixBL.my_log # must be before super!!!
self.p_s('Dummy_device Dummy_PSIDetector.__init__ ')
self.p_s("Dummy_device Dummy_PSIDetector.__init__ ")
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
@@ -345,28 +346,25 @@ class Dummy_PSIDetector(PSIDetectorBase):
self._update_scaninfo()
self._update_filewriter()
self._init()
#.. prepare my own log file
self.p_s('Dummy_device Dummy_PSIDetector.__init__ .. done ')
# .. prepare my own log file
self.p_s("Dummy_device Dummy_PSIDetector.__init__ .. done ")
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter')
self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter")
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter .. done ')
self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter .. done ")
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo')
self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo")
self.scaninfo = BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo .. done ')
self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo .. done ")
def _update_service_config(self) -> None:
"""Update service config from BEC service config
@@ -376,32 +374,31 @@ class Dummy_PSIDetector(PSIDetectorBase):
# pylint: disable=import-outside-toplevel
from bec_lib.bec_service import SERVICE_CONFIG
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config')
self.p_s("Dummy_device Dummy_PSIDetector._update_service_config")
if SERVICE_CONFIG:
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
return
self.service_cfg = {"base_path": os.path.abspath(".")}
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config .. done')
self.p_s("Dummy_device Dummy_PSIDetector._update_service_config .. done")
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id')
self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id")
old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id:
self.stopped = True
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id .. done ')
self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id .. done ")
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
self.p_s('Dummy_device Dummy_PSIDetector._init')
self.p_s("Dummy_device Dummy_PSIDetector._init")
self.custom_prepare.on_init()
self.p_s('Dummy_device Dummy_PSIDetector._init ... done ')
self.p_s("Dummy_device Dummy_PSIDetector._init ... done ")
def stage(self) -> list[object]:
"""
@@ -414,15 +411,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
list(object): list of objects that were staged
"""
self.p_s('Dummy_device Dummy_PSIDetector.stage')
self.p_s("Dummy_device Dummy_PSIDetector.stage")
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.on_stage()
self.p_s('Dummy_device Dummy_PSIDetector.stage done ')
self.p_s("Dummy_device Dummy_PSIDetector.stage done ")
return super().stage()
@@ -433,22 +429,21 @@ class Dummy_PSIDetector(PSIDetectorBase):
time-critical actions. Therefore, it should also be kept as short/fast as possible.
I.e. Arming a detector in case there is a risk of timing out.
"""
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan')
self.p_s("Dummy_device Dummy_PSIDetector.pre_scan")
self.custom_prepare.on_pre_scan()
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan .. done ')
self.p_s("Dummy_device Dummy_PSIDetector.pre_scan .. done ")
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
# pylint: disable=assignment-from-no-return
self.p_s('Dummy_device Dummy_PSIDetector.trigger')
self.p_s("Dummy_device Dummy_PSIDetector.trigger")
status = self.custom_prepare.on_trigger()
if isinstance(status, DeviceStatus):
return status
self.p_s('Dummy_device Dummy_PSIDetector.trigger.. done ')
self.p_s("Dummy_device Dummy_PSIDetector.trigger.. done ")
return super().trigger()
@@ -461,14 +456,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
"""
# pylint: disable=assignment-from-no-return
self.p_s('Dummy_device Dummy_PSIDetector.complete')
self.p_s("Dummy_device Dummy_PSIDetector.complete")
status = self.custom_prepare.on_complete()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
self.p_s('Dummy_device Dummy_PSIDetector.complete ... done ')
self.p_s("Dummy_device Dummy_PSIDetector.complete ... done ")
return status
@@ -484,11 +479,11 @@ class Dummy_PSIDetector(PSIDetectorBase):
Returns:
list(object): list of objects that were unstaged
"""
self.p_s('Dummy_device Dummy_PSIDetector.unstage')
self.p_s("Dummy_device Dummy_PSIDetector.unstage")
self.check_scan_id()
self.custom_prepare.on_unstage()
self.stopped = False
self.p_s('Dummy_device Dummy_PSIDetector.unstage .. done')
self.p_s("Dummy_device Dummy_PSIDetector.unstage .. done")
return super().unstage()
@@ -497,9 +492,8 @@ class Dummy_PSIDetector(PSIDetectorBase):
Stop the scan, with camera and file writer
"""
self.p_s('Dummy_device Dummy_PSIDetector.stop')
self.p_s("Dummy_device Dummy_PSIDetector.stop")
self.custom_prepare.on_stop()
super().stop(success=success)
self.stopped = True
self.p_s('Dummy_device Dummy_PSIDetector.stop ... done')
self.p_s("Dummy_device Dummy_PSIDetector.stop ... done")

View File

@@ -1,18 +1,18 @@
import time
""" Module for the PhoenixTrigger class to connect to the ADC card
that creates TTL signals to trigger cameras and detectors at Phoenix. """
import enum
import numpy as np
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
CustomDetectorMixin,
PSIDetectorBase,
)
from bec_lib import bec_logger
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
@@ -22,7 +22,7 @@ class PhoenixTriggerError(Exception):
"""PhoenixTrigger specific error"""
class SAMPLINGDONE(int, enum.Enum):
class SAMPLING(int, enum.Enum):
"""Sampling Done PV"""
RUNNING = 0
@@ -31,13 +31,11 @@ class SAMPLINGDONE(int, enum.Enum):
class PhoenixTriggerSetup(CustomDetectorMixin):
"""
This defines the PHOENIX trigger setup.
Mixin Class to setup the PhoenixTrigger device
"""
def on_stage(self) -> None:
"""Actions to take place on stage"""
"""On stage actions which are executed upon staging the device"""
if self.parent.scaninfo.scan_type == "step":
self.parent.start_csmpl.set(0)
self.parent.total_cycles.set(1)
@@ -47,19 +45,21 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
logger.info(f"Device {self.parent.name} was staged for step scan")
def on_unstage(self) -> None:
"""Actions to take place on unstage"""
"""On unstage actions which are executed upon unstaging the device"""
self.on_stop()
def on_trigger(self) -> DeviceStatus:
"""Actions to be performed upon receiving a software trigger"""
"""On trigger actions which are executed upon triggering the device"""
# TODO Test the proper check for the falcon state
# Check first that falcon is set to acquiring
falcon = self.parent.device_manager.devices.get("falcon_nohdf5", None)
timeout = 1
if falcon is not None:
if self.wait_for_signals([(falcon.state.get, 1)], timeout=1):
# TODO Check that falcon.state.get() == 1 is the correct check. --> When is the falcon acquiring, this assumes 1?
if not self.wait_for_signals([(falcon.state.get, 1)], timeout=timeout):
raise PhoenixTriggerError(
f"Falcon not ready to take trigger after 1s timeout in trigger"
f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s"
)
falcon.state.get() == 1 # Acquiring
if self.parent.scaninfo.scan_type == "step":
time.sleep(0.2)
self.parent.smpl.put(1)
@@ -67,7 +67,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
time.sleep(0.2)
# Trigger function from ophyd.Device returns a DeviceStatus. This function
# starts a process that creates a DeviceStatus, and waits for the signal_conditions
# self.parent.smpl_done.get to change to the value SAMPLINGDONE.DONE
# self.parent.smpl_done.get to change to the value SAMPLING.DONE
# Once this takes place, the DeviceStatus.done flag will be set to True.
# When BEC calls trigger() on the devices, this method will be called assuming that
# the devices config softwareTrigger=True is set.
@@ -75,7 +75,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
# self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
# which ensures that the DeviceStatus object resolves before continuing, i.e. DeviceStatus.done = True
status = self.wait_with_status(
signal_conditions=[(self.parent.smpl_done.get, SAMPLINGDONE.DONE)],
signal_conditions=[(self.parent.smpl_done.get, SAMPLING.DONE)],
timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate
check_stopped=True,
)
@@ -83,24 +83,23 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
def on_stop(self) -> None:
"""Actions to stop the Device"""
# Put the Device in cont mode
# Put the Device again in continous acquisition mode
self.parent.total_cycles.set(5)
self.parent.start_csmpl.set(1)
self.parent.smpl.put(1)
time.sleep(0.5)
self.parent.smpl.put(1)
time.sleep(0.2)
if self.parent.smpl_done.get() == SAMPLINGDONE.RUNNING:
if self.parent.smpl_done.get() == SAMPLING.RUNNING:
return
self.parent.smpl.put(1)
class PhoenixTrigger(PSIDetectorBase):
"""
Docstring:
Class for PHOENIX TTL hardware trigger (X07MB-OP2:)
Class for PHOENIX TTL hardware trigger: 'X07MB-OP2:'
This device is used to trigger communicate with an ADC card that creates TTL signals to trigger cameras and detectors at Phoenix.
"""
custom_prepare_cls = PhoenixTriggerSetup
@@ -123,15 +122,13 @@ class PhoenixTrigger(PSIDetectorBase):
if __name__ == "__main__":
# Test the PhoenixTrigger class
trigger = PhoenixTrigger(name="trigger", prefix="X07MB-OP2:")
trigger.wait_for_connection(all_signals=True)
trigger.read()
trigger.read_configuration()
trigger.stage()
status = trigger.trigger()
while status.done is False:
print(f" Waiting for status, flag is {status.done}")
time.sleep(0.2)
device_status = trigger.trigger()
device_status.wait()
trigger.unstage()

View File

@@ -23,90 +23,87 @@ but they are executed in a specific order:
"""
# imports in ScanBase
#from __future__ import annotations
# from __future__ import annotations
#import ast
#import enum
#import threading
#import time
#import uuid
#from abc import ABC, abstractmethod
#from typing import Any, Literal
# import ast
# import enum
# import threading
# import time
# import uuid
# from abc import ABC, abstractmethod
# from typing import Any, Literal
#import numpy as np
# import numpy as np
#from bec_lib.device import DeviceBase
#from bec_lib.devicemanager import DeviceManagerBase
#from bec_lib.endpoints import MessageEndpoints
#from bec_lib.logger import bec_logger
# from bec_lib.device import DeviceBase
# from bec_lib.devicemanager import DeviceManagerBase
# from bec_lib.endpoints import MessageEndpoints
# from bec_lib.logger import bec_logger
#from .errors import LimitError, ScanAbortion
#from .path_optimization import PathOptimizerMixin
#from .scan_stubs import ScanStubs
# from .errors import LimitError, ScanAbortion
# from .path_optimization import PathOptimizerMixin
# from .scan_stubs import ScanStubs
# end imports in ScanBase
# import time
# import numpy as np
import time
import numpy as np
# from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import ScanArgType, ScanBase
from phoenix_bec.scripts.phoenix import PhoenixBL
# from bec_lib import messages
# from bec_server.scan_server.errors import ScanAbortion
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
# logger = bec_logger.logger
from bec_server.scan_server.scans import ScanBase, ScanArgType
import numpy as np
import time
from bec_lib.logger import bec_logger
from phoenix_bec.scripts.phoenix import PhoenixBL
logger = bec_logger.logger
class LogTime():
class LogTime:
def __init__(self):
logger.success('init LogTime')
self.t0=time.time()
logger.success("init LogTime")
self.t0 = time.time()
def p_s(self,x):
now=time.time()
#delta=now-self.t0
m=str(now)+' sec '+x
logger.success(m)custom_prepare_cls(parent=self, **kwargs)
# making the instance of PSID
#self.t0=now
file=open('MyLogfile.txt','a')
file.write(m+'\n')
def p_s(self, x):
now = time.time()
# delta=now-self.t0
m = str(now) + " sec " + x
logger.success(m)
# making the instance of PSID
# self.t0=now
file = open("MyLogfile.txt", "a")
file.write(m + "\n")
file.close
class PhoenixScanBaseTTL(ScanBase):
"""
Base scan cl p_s('init scrips.phoenix.scans.PhoenixLineScan')
"""
def scan_core(self):
"""perform the scan core procedure"""
self.p_s('PhoenixScanBaseTT.scan_core')
self.p_s("PhoenixScanBaseTT.scan_core")
for ind, pos in self._get_position():
for self.burst_index in range(self.burst_at_each_point):
self.p_s('PhoenixScanBaseTT.scan_core in loop ')
self.p_s("PhoenixScanBaseTT.scan_core in loop ")
yield from self._at_each_point(ind, pos)
self.burst_index = 0
def _at_each_point(self, ind=None, pos=None):
self.p_s('PhoenixScanBaseTT._at_each_point')
self.p_s("PhoenixScanBaseTT._at_each_point")
yield from self._move_scan_motors_and_wait(pos)
if ind > 0:
yield from self.stubs.wait(
@@ -123,12 +120,11 @@ class PhoenixScanBaseTTL(ScanBase):
)
self.point_id += 1
self.p_s('done')
self.p_s("done")
class PhoenixLineScan(PhoenixScanBaseTTL):
scan_name = "phoenix_line_scan"
required_kwargs = ["steps", "relative"]
arg_input = {
@@ -149,25 +145,25 @@ class PhoenixLineScan(PhoenixScanBaseTTL):
steps: int = None,
relative: bool = False,
burst_at_each_point: int = 1,
setup_device:str= None,
setup_device: str = None,
**kwargs,
):
"""
A phoenix line scan for one or more motors.
A phoenix line scan for one or more motors.
Args:
*args (Device, float, float): pairs of device / start position / end position
exp_time (float): exposure time in s. Default: 0
steps (int): number of steps. Default: 10
relative (bool): if True, the start and end positions are relative to the current position. Default: False
burst_a Specifies the level of type checking analysis to perform.
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
Args:
*args (Device, float, float): pairs of device / start position / end position
exp_time (float): exposure time in s. Default: 0
steps (int): number of steps. Default: 10
relative (bool): if True, the start and end positions are relative to the current position. Default: False
burst_a Specifies the level of type checking analysis to perform.
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
"""
#from phoenix_bec.scripts.phoenix import PhoenixBL
self.p_s=PhoenixBL.my_log
# from phoenix_bec.scripts.phoenix import PhoenixBL
self.p_s = PhoenixBL.my_log
self.p_s('init scripts.phoenix.scans.PhoenixLineScan')
self.p_s("init scripts.phoenix.scans.PhoenixLineScan")
super().__init__(
exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs
@@ -176,14 +172,13 @@ ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, rela
self.setup_device = setup_device
time.sleep(1)
self.p_s('done')
self.p_s("done")
def _calculate_positions(self) -> None:
self.p_s('PhoenixLineScan._calculate_positions')
self.p_s("PhoenixLineScan._calculate_positions")
axis = []
for _, val in self.caller_args.items():
ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float)
axis.append(ax_pos)
self.positions = np.array(list(zip(*axis)), dtype=float)
self.p_s('done')
self.p_s("done")

View File

@@ -0,0 +1,97 @@
import threading
import time
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.interfaces.base_classes.psi_detector_base import DeviceTimeoutError
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from phoenix_bec.devices.phoenix_trigger import SAMPLING, PhoenixTrigger
@pytest.fixture(scope="function")
def mock_trigger():
name = "phoenix_trigger"
prefix = "X07MB-OP2:"
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
mock.patch(
"ophyd_devices.interfaces.base_classes.psi_detector_base.FileWriter"
) as filemixin,
mock.patch(
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config,
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(PhoenixTrigger, "_init"):
det = PhoenixTrigger(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
def test_phoenix_trigger_init(mock_trigger):
"""Test PhoenixTrigger init"""
assert mock_trigger.name == "phoenix_trigger"
assert mock_trigger.prefix == "X07MB-OP2:"
def test_phoenix_trigger_stage(mock_trigger):
"""Test PhoenixTrigger on_stage"""
with mock.patch.object(mock_trigger.scaninfo, "load_scan_metadata") as mock_load_scan_metadata:
mock_trigger.scaninfo.scan_type = "step"
mock_trigger.scaninfo.exp_time = exp_time = 1
mock_trigger.stage()
assert mock_load_scan_metadata.call_count == 1
assert mock_trigger.start_csmpl.get() == 0
assert mock_trigger.total_cycles.get() == np.ceil(exp_time * 5)
assert mock_trigger.smpl.get() == 1
def test_phoenix_trigger_unstage(mock_trigger):
"""Test PhoenixTrigger on_unstage"""
with mock.patch.object(mock_trigger.custom_prepare, "on_stop") as mock_on_stop:
mock_trigger.unstage()
assert mock_on_stop.call_count == 1
def test_phoenix_trigger_stop(mock_trigger):
"""Test PhoenixTrigger on_stop"""
with mock.patch.object(mock_trigger.smpl, "put") as mock_smpl_put:
mock_trigger.smpl_done._read_pv.mock_data = SAMPLING.RUNNING
mock_trigger.stop()
assert mock_trigger.stopped is True
assert mock_trigger.total_cycles.get() == 5
assert mock_trigger.start_csmpl.get() == 1
assert mock_smpl_put.call_args_list == [mock.call(1), mock.call(1)]
def test_phoenix_trigger_trigger(mock_trigger):
"""Test PhoenixTrigger on_trigger
First test that the trigger timeouts due to readback from smpl_done not being done.
Afterwards, check that status object resolved correctly if smpl_done is done.
"""
exp_time = 0.05
mock_trigger.device_manager.add_device("falcon_nohdf5")
falcon_state = mock_trigger.device_manager.devices.falcon_nohdf5.state = mock.MagicMock()
falcon_state.get = mock.MagicMock(return_value=1)
mock_trigger.scaninfo.scan_type = "step"
mock_trigger.scaninfo.exp_time = exp_time
with mock.patch.object(
mock_trigger.custom_prepare, "wait_with_status", return_value=mock.MagicMock()
) as mock_wait_with_status:
status = mock_trigger.trigger()
assert mock_wait_with_status.call_count == 1
assert mock_wait_with_status.call_args[1]["signal_conditions"] == [
(mock_trigger.smpl_done.get, SAMPLING.DONE)
]
assert mock_wait_with_status.call_args[1]["timeout"] == 5 * exp_time
assert mock_wait_with_status.call_args[1]["check_stopped"] is True