diff --git a/superxas_bec/device_configs/x10da_config_test.yaml b/superxas_bec/device_configs/x10da_config_test.yaml new file mode 100644 index 0000000..a9171de --- /dev/null +++ b/superxas_bec/device_configs/x10da_config_test.yaml @@ -0,0 +1,301 @@ + +####################################### +## Beam Monitors 2 and 3 -- Virtual positioners + +bm2_tr1: + readoutPriority: baseline + description: Beam Monitor 2 Translation 1 + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-OP-BM2:TR1 + onFailure: retry + enabled: true + softwareTrigger: false +bm2_tr2: + readoutPriority: baseline + description: Beam Monitor 2 Translation 2 + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-OP-BM2:TR2 + onFailure: retry + enabled: true + softwareTrigger: false +bm3_tr1: + readoutPriority: baseline + description: Beam Monitor 3 Translation 1 + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-OP-BM3:TR1 + onFailure: retry + enabled: true + softwareTrigger: false +bm3_tr2: + readoutPriority: baseline + description: Beam Monitor 3 Translation 2 + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-OP-BM3:TR2 + onFailure: retry + enabled: true + softwareTrigger: false +kb_slit_y: + readoutPriority: baseline + description: KB slit axis Y + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-SV1:OPENY + onFailure: retry + enabled: true + softwareTrigger: false + +##### Ionization chambers +ic1: + readoutPriority: monitored + description: Ionization Chamber 1 + deviceClass: ophyd.EpicsSignalRO + deviceConfig: + read_pv: X10DA-ES1-SAI_01:MEAN + auto_monitor: True + onFailure: raise + enabled: True + softwareTrigger: False +ic2: + readoutPriority: monitored + description: Ionization Chamber 2 + deviceClass: ophyd.EpicsSignalRO + deviceConfig: + read_pv: X10DA-ES1-SAI_02:MEAN + auto_monitor: True + onFailure: raise + enabled: True + softwareTrigger: False +ic3: + readoutPriority: monitored + description: Ionization Chamber 3 + deviceClass: ophyd.EpicsSignalRO + deviceConfig: + read_pv: X10DA-ES1-SAI_03:MEAN + auto_monitor: True + onFailure: raise + enabled: True + softwareTrigger: False +ic4: + readoutPriority: monitored + description: Ionization Chamber 4 + deviceClass: ophyd.EpicsSignalRO + deviceConfig: + read_pv: X10DA-ES1-SAI_04:MEAN + auto_monitor: True + onFailure: raise + enabled: True + softwareTrigger: False + +##### Trigger Card ##### + +trigger: + readoutPriority: baseline + description: Trigger Card + deviceClass: superxas_bec.devices.trigger.Trigger + deviceConfig: + prefix: 'X10DA-ES1:' + onFailure: raise + enabled: True + softwareTrigger: True + +##### Falcon detector ##### +falcon: + readoutPriority: monitored + description: Falcon Sitoro detector + deviceClass: superxas_bec.devices.falcon.FalconSuperXAS + deviceConfig: + prefix: 'X10DA-SITORO:' + onFailure: raise + enabled: True + softwareTrigger: False + + + +################################# +###### EXPERIMENTAL STATION ##### +################################# + +####################################### +## Harmonic Rejection Mirror -- Physical positioners + +hrm_try: + readoutPriority: baseline + description: Harmonic Rejection Mirror Y-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-HRM:TRY + onFailure: retry + enabled: true + softwareTrigger: false +hrm_rotx: + readoutPriority: baseline + description: Harmonic Rejection Mirror X-Rotation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-HRM:ROX + onFailure: retry + enabled: true + softwareTrigger: false + + +####################################### +## Ionization Chambers 1, 2, and 3 -- Physical positioners + +ic1_try: + readoutPriority: baseline + description: Ionization Chamber 1 Y-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-IC1:TRY + onFailure: retry + enabled: true + softwareTrigger: false +ic2_try: + readoutPriority: baseline + description: Ionization Chamber 2 Y-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-IC2:TRY + onFailure: retry + enabled: true + softwareTrigger: false +ic3_try: + readoutPriority: baseline + description: Ionization Chamber 3 Y-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-IC3:TRY + onFailure: retry + enabled: true + softwareTrigger: false + +####################################### +## Sample Manipulator (Old) -- Physical positioners + +ma1_trx: + readoutPriority: baseline + description: Sample Manipulator 1 X-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-MA1:TRX + onFailure: retry + enabled: true + softwareTrigger: false +ma1_trx1: + readoutPriority: baseline + description: Sample Manipulator 1 X-Translation 1 + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-MA1:TRX1 + onFailure: retry + enabled: true + softwareTrigger: false +ma1_trx2: + readoutPriority: baseline + description: Sample Manipulator 1 X-Translation 2 + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-MA1:TRX2 + onFailure: retry + enabled: true + softwareTrigger: false +ma1_try: + readoutPriority: baseline + description: Sample Manipulator 1 Y-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-MA1:TRY + onFailure: retry + enabled: true + softwareTrigger: false +# ma1_rot2: +# readoutPriority: baseline +# description: Sample Manipulator 1 Y-Rotation +# deviceClass: ophyd.EpicsMotor +# deviceConfig: +# prefix: X10DA-ES1-MA1:ROT2 +# onFailure: retry +# enabled: true +# softwareTrigger: false + + +####################################### +## Experimental Table 1 and 2 -- Physical positioners + +et1_trx: + readoutPriority: baseline + description: Experimental Table 1 X-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-ET1:TRX + onFailure: retry + enabled: true + softwareTrigger: false +et1_try: + readoutPriority: baseline + description: Experimental Table 1 Y-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-ET1:TRY + onFailure: retry + enabled: true + softwareTrigger: false +et2_trx: + readoutPriority: baseline + description: Experimental Table 2 X-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES2-ET2:TRX + onFailure: retry + enabled: true + softwareTrigger: false +et2_try: + readoutPriority: baseline + description: Experimental Table 2 Y-Translation + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES2-ET2:TRY + onFailure: retry + enabled: true + softwareTrigger: false + +####################################### +## X-Ray Eye -- Physical positioners + +xe1_zoom: + readoutPriority: baseline + description: X-Ray Eye Zoom + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-XE1:ZOOM + onFailure: retry + enabled: true + softwareTrigger: false +xe1_finfoc: + readoutPriority: baseline + description: X-Ray Eye Fine Focus + deviceClass: ophyd.EpicsMotor + deviceConfig: + prefix: X10DA-ES1-XE1:FINFOC + onFailure: retry + enabled: true + softwareTrigger: false + +####################################### +## Optics + +## EPICS IOC does not seem to comply to MotorRecord + +mono_energy: + readoutPriority: baseline + description: X-Ray Eye Zoom + deviceClass: ophyd.PVPositioner + deviceConfig: + prefix: X10DA-OP1-MO1:BraggEAO + onFailure: retry + enabled: true + softwareTrigger: false diff --git a/superxas_bec/devices/falcon.py b/superxas_bec/devices/falcon.py new file mode 100644 index 0000000..1c12169 --- /dev/null +++ b/superxas_bec/devices/falcon.py @@ -0,0 +1,204 @@ +"""FALCON device implementation for SuperXAS""" + +import enum + +import numpy as np +from bec_lib.devicemanager import ScanInfo +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd import DeviceStatus, Kind, Signal, StatusBase +from ophyd.status import SubscriptionStatus +from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +logger = bec_logger.logger + + +class FalconAcquiringStatus(int, enum.Enum): + """Status of Falcon""" + + DONE = 0 + ACQUIRING = 1 + + +class DeadTimeCorrectedCounts(Signal): + """Signal to calculate dead time corrected counts""" + + def __init__(self, name: str, channel: int, **kwargs): + """ + Initialize DeadTimeCorrectedCounts signal. + + Args: + name (str): Name of the signal + channel (int): Channel number + """ + super().__init__(name=name, **kwargs) + self._channel = channel + self._dead_time = 1.182e-7 + + # pylint: disable=arguments-differ + def get(self) -> float: + """Get dead time corrected counts base on signals from dxp and mca of Falcon""" + dxp: EpicsDXPFalcon = getattr(self.parent, f"dxp{self._channel}") + mca: EpicsMCARecord = getattr(self.parent, f"mca{self._channel}") + + icr = dxp.input_count_rate.get() + ocr = dxp.output_count_rate.get() + roi = mca.rois.roi0.count.get() + ert = mca.elapsed_real_time.get() + print(icr, ocr, roi, ert) + + if icr == 0 or ocr == 0: + return 0 + + # Check that relative change is large enough + test = 1e9 + test_icr = icr + n = 0 + while test > self._dead_time and n < 30: + try: + true_icr = icr * np.exp(test_icr * self._dead_time) + test = (true_icr - test_icr) / test_icr + test_icr = true_icr + n += 1 + except Exception as e: # pylint: disable=broad-except + logger.info(f"Error in computation of signal {self.name}, error: {e}") + return 0 + + # Return corrected roi counts + cor_roi_cnts = 0 + if ocr * ert != 0: + cor_roi_cnts = roi * true_icr / (ocr * ert) + return cor_roi_cnts + + +class FalconControl(Falcon): + """Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'""" + + _default_read_attrs = Falcon._default_read_attrs + ( + "dxp1", + # # "dxp2", + "mca1", + # # "mca2", + "dead_time_cor_cnts1", + # # "dead_time_cor_cnts2", + ) + _default_configuration_attrs = Falcon._default_configuration_attrs + ( + "dxp1", + # "dxp2", + "mca1", + # "mca2", + "dead_time_cor_cnts1", + # "dead_time_cor_cnts2", + ) + + # DXP parameters + dxp1 = Cpt(EpicsDXPFalcon, "dxp1:") + # dxp2 = Cpt(EpicsDXPFalcon, "dxp2:") + + # MCA record with spectrum data + mca1 = Cpt(EpicsMCARecord, "mca1") + # mca2 = Cpt(EpicsMCARecord, "mca2") + + # Norm Signal + dead_time_cor_cnts1 = Cpt( + DeadTimeCorrectedCounts, name="dead_time_cor_cnts", channel=1, kind=Kind.hinted + ) + # dead_time_cor_cnts2 = Cpt(DeadTimeCorrectedCounts, name='dead_time_cor_cnts', channel=2, kind=Kind.normal) + + +class FalconSuperXAS(PSIDeviceBase, FalconControl): + """Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'""" + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def __init__(self, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + """ + Initialize Falcon device. + + Args: + name (str): Name of the device + prefix (str): Prefix of the device + scan_info (ScanInfo): Information about the scan + **kwargs: Additional keyword arguments + """ + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self._pv_timeout = 1 + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + self.collect_mode.set(0).wait() + self.preset_real_time.set(0).wait() + self.stop_all.put(1) + if ( + self.wait_for_condition( + lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout + ) + is False + ): + raise TimeoutError("Timeout on Falcon stage") + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device.""" + self.stop_all.put(1) + self.erase_all.put(1) + if ( + self.wait_for_condition( + lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout + ) + is False + ): + raise TimeoutError("Timeout on Falcon unstage") + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """Called to inquire if a device has completed a scans.""" + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.stop_all.put(1) + + def _stop_erase_and_wait_for_acquiring(self) -> DeviceStatus: + """Method called from the Trigger card to reset counts on the Falcon""" + + if self.acquiring.get() != FalconAcquiringStatus.DONE: + self.stop_all.put(1) + + def _check_acquiriting(*, old_value, value, **kwargs): + if old_value == FalconAcquiringStatus.DONE and value == FalconAcquiringStatus.ACQUIRING: + return True + return False + + status = SubscriptionStatus(self.acquiring, _check_acquiriting) + + logger.info("Triggering Falcon") + self.erase_start.put(1) + return status diff --git a/superxas_bec/devices/trigger.py b/superxas_bec/devices/trigger.py new file mode 100644 index 0000000..21e7c06 --- /dev/null +++ b/superxas_bec/devices/trigger.py @@ -0,0 +1,139 @@ +"""SuperXAS Trigger Device""" + +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, StatusBase + +logger = bec_logger.logger + +import enum + +from bec_lib.devicemanager import ScanInfo +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + + +class ContinuousSamplingMode(int, enum.Enum): + """Options for start_csmpl signal""" + + OFF = 0 + ON = 1 + + +class SamplingDone(int, enum.Enum): + """Status of sampling""" + + RUNNING = 0 + DONE = 1 + + +class TriggerControl(Device): + """Trigger Device Control PVs at X10DA, prefix: X10DA-ES1:""" + + total_cycles = Cpt( + EpicsSignal, + suffix="TOTAL-CYCLES", + kind=Kind.config, + doc="Number of cycles (multiplies by 0.2s)", + ) + start_csmpl = Cpt( + EpicsSignal, suffix="START-CSMPL", kind=Kind.config, doc="Continous sampling mode on/off" + ) + smpl = Cpt( + EpicsSignal, suffix="SMPL", kind=Kind.config, doc="Sampling Trigger if cont mode is off" + ) + smpl_done = Cpt( + EpicsSignalRO, suffix="SMPL-DONE", kind=Kind.config, doc="Done status of trigger" + ) + + +class Trigger(PSIDeviceBase, TriggerControl): + """Trigger Device of X10DA (SUPERXAS), prefix: X10DA-ES1:""" + + def __init__( + self, + name: str, + prefix: str = "", + scan_info: ScanInfo | None = None, + device_manager=None, + **kwargs, + ): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self.device_manager = device_manager + self._pv_timeout = 1 + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + self.start_csmpl.set(ContinuousSamplingMode.OFF).wait(timeout=self._pv_timeout) + exp_time = self.scan_info.msg.scan_parameters["exp_time"] + if self.scan_info.msg.scan_name != "exafs_scan": + self.set_exposure_time(exp_time).wait() + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device.""" + status = self.start_csmpl.set(ContinuousSamplingMode.ON) + return status + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + falcon = self.device_manager.devices.get("falcon", None) + + if falcon is not None: + # pylint: disable=protected-access + status = falcon._stop_erase_and_wait_for_acquiring() + status.wait() + + started = False + + def _sampling_done(): + nonlocal started + if not started and self.smpl_done.get() == SamplingDone.RUNNING: + started = True + return False + if started and self.smpl_done.get() == SamplingDone.DONE: + return True + + return self.smpl_done.get() == SamplingDone.DONE + + self.smpl.put(1) + status = self.task_handler.submit_task(_sampling_done, run=True) + return status + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """Called to inquire if a device has completed a scans.""" + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + self.task_handler.shutdown() + + def set_exposure_time(self, value: float) -> DeviceStatus: + """Utility method to set exposure time complying to device logic with cycle of min 0.2s.""" + cycles = max(int(value * 5), 1) + return self.total_cycles.set(cycles) diff --git a/superxas_bec/scans/__init__.py b/superxas_bec/scans/__init__.py index e69de29..95eb5ae 100644 --- a/superxas_bec/scans/__init__.py +++ b/superxas_bec/scans/__init__.py @@ -0,0 +1 @@ +from .exafs_scan import EXAFSScan \ No newline at end of file diff --git a/superxas_bec/scans/exafs_scan.py b/superxas_bec/scans/exafs_scan.py new file mode 100644 index 0000000..a73e0f2 --- /dev/null +++ b/superxas_bec/scans/exafs_scan.py @@ -0,0 +1,131 @@ +import time + +import numpy as np +from bec_lib.device import DeviceBase +from bec_lib.logger import bec_logger +from bec_server.scan_server.scans import ScanBase + +logger = bec_logger.logger + + +class EXAFSScan(ScanBase): + scan_name = "exafs_scan" + + def __init__( + self, + edge_energy: float, + xas_rel_range: list[float] | np.ndarray[float] | None = None, + n_points: list[int] | np.ndarray[int] | None = None, + k_step: list[bool] | np.ndarray[bool] | None = None, + integ_time: list[float] | np.ndarray[float] | None = None, + motor: DeviceBase | None = None, + settling_time: float = 0.2, + **kwargs, + ): + """ + EXAFS Scan of the mono_energy axix + + Args: + edge_energy (float) : Adsorption Edge Energy + xas_rel_range (list[float] | np.ndarray[int] | None) : Optinoal, relative range for XAS, Length of list must n_points +1 + n_points (list[int] | np.ndarray[bool] | None) : Optional, number of points per range + ...#TODO docstring + """ + self.edge_energy = edge_energy + self.xas_rel_range = xas_rel_range + self.n_points = n_points + self.k_step = k_step + self.integ_time = integ_time + self.k_step_conversion = 3.81 + self._check_and_upated_input_arguments() + if motor is None: + default_motor = "kb_slit_y" + motor = default_motor # TODO Remove that motor, put energy of mono + self.motor = motor + super().__init__(exp_time=0, relative=False, settling_time=settling_time, **kwargs) + + # Check that trigger device is enabled + _dev_trigger_name = "trigger" + self._dev_trigger: DeviceBase = self.device_manager.devices.get(_dev_trigger_name, None) + if self._dev_trigger is None or self._dev_trigger.enabled == False: + raise ValueError( + f"Trigger device not found or not enabled in devicemanager {self._dev_trigger}" + ) + # update scan parameters + self.scan_parameters["edge_energy"] = self.edge_energy + self.scan_parameters["xas_rel_range"] = self.xas_rel_range + self.scan_parameters["n_points"] = self.n_points + self.scan_parameters["k_step"] = self.k_step + self.scan_parameters["integ_time"] = self.integ_time + # update readout_priority + self.readout_priority = {"monitored": [self.motor]} + + def update_scan_motors(self): + self.scan_motors = [self.motor] + + def _check_and_upated_input_arguments(self) -> None: + """ + Input parameters for the EXAFS scan must be of the same length for n_points, k_step, integ_time + and xas_rel_range (-1). This methods checks for this condition, and calculates the integration time + for each point in the scan. If the input parameters are not provided with the correct length, + it raises a ValueError which indicates which parameters are not matching. + """ + + if not all( + [ + len(self.n_points) == len(self.k_step), + len(self.n_points) == len(self.integ_time), + len(self.n_points) == (len(self.xas_rel_range) - 1), # careful -1 + ] + ): + raise ValueError( + f"Input parameters must have matching lengths:\n" + f"n_points: {len(self.n_points)}, " + f"k_step: {len(self.k_step)}, expected: {len(self.n_points)}, " + f"integ_time: {len(self.integ_time)}, expected: {len(self.n_points)}, " + f"xas_rel_range: {len(self.xas_rel_range)}, expected: {len(self.n_points)-1} " + ) + + self.integ_time = np.repeat(np.array(self.integ_time), np.array(self.n_points)) + + def _set_position_offset(self): + """Do not set offset""" + yield None + + def _calculate_positions(self): + positions = [] + for ii, pnts in enumerate(self.n_points): + if self.k_step[ii] is False: + positions.extend( + np.linspace( + self.xas_rel_range[ii], self.xas_rel_range[ii + 1], pnts, endpoint=False + ).tolist() + ) + else: + k_start = np.sqrt(self.xas_rel_range[ii] / self.k_step_conversion) + k_stop = np.sqrt(self.xas_rel_range[ii + 1] / self.k_step_conversion) + k_pos = np.linspace(k_start, k_stop, pnts, endpoint=False) + k_pos = k_pos**2 * self.k_step_conversion + positions.extend(k_pos.tolist()) + + # Create positions array + self.positions = np.vstack(positions) + # shift by edge energy + self.positions = self.positions + self.edge_energy + # Convert to keV + self.positions = self.positions / 1e3 + + def _at_each_point(self, ind=None, pos=None): + yield from self._move_scan_motors_and_wait(pos) + time.sleep(self.settling_time) + trigger_time = self.integ_time[ind] + self.stubs.send_rpc_and_wait(self._dev_trigger, "set_exposure_time", trigger_time) + + # Trigger + yield from self.stubs.trigger(min_wait=trigger_time) + + # Readout all monitored devices + yield from self.stubs.read(group="monitored", point_id=self.point_id) + + # Increase point id + self.point_id += 1 diff --git a/tests/tests_devices/test_devices_falcon.py b/tests/tests_devices/test_devices_falcon.py new file mode 100644 index 0000000..edea7f0 --- /dev/null +++ b/tests/tests_devices/test_devices_falcon.py @@ -0,0 +1,103 @@ +"""Tests for Falcon device.""" + +import threading +from unittest import mock + +import ophyd +import pytest +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + +from superxas_bec.devices.falcon import FalconAcquiringStatus, FalconSuperXAS + +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name + + +@pytest.fixture(scope="function") +def falcon(): + """Trigger device with mocked EPICS PVs.""" + name = "falcon" + prefix = "X10DA-SITORO:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = FalconSuperXAS(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev + + +def test_devices_falcon(falcon): + """Test init and on_connected methods of Falcon device""" + + assert falcon.prefix == "X10DA-SITORO:" + assert falcon.name == "falcon" + assert falcon._pv_timeout == 1 + falcon.on_connected() + + +def test_devices_falcon_stage(falcon): + """Test on_stage method of Falcon device""" + + falcon.collect_mode.put(1) + falcon.preset_real_time.put(1) + falcon.stop_all.put(0) + falcon.acquiring.put(FalconAcquiringStatus.DONE) + # Should resolve with that status + falcon.on_stage() + assert falcon.collect_mode.get() == 0 + assert falcon.preset_real_time.get() == 0 + assert falcon.stop_all.get() == 1 + # Should timeout + falcon.acquiring.put(FalconAcquiringStatus.ACQUIRING) + falcon._pv_timeout = 0.1 + with pytest.raises(TimeoutError): + falcon.on_stage() + + +def test_devices_falcon_unstage(falcon): + """Test on_unstage method of Falcon device""" + + falcon.stop_all.put(0) + falcon.erase_all.put(0) + falcon.acquiring.put(FalconAcquiringStatus.DONE) + # Should resolve with that status + falcon.on_unstage() + assert falcon.stop_all.get() == 1 + assert falcon.erase_all.get() == 1 + # Should timeout + falcon.acquiring.put(FalconAcquiringStatus.ACQUIRING) + falcon._pv_timeout = 0.1 + with pytest.raises(TimeoutError): + falcon.on_unstage() + + +def test_devices_falcon_stop(falcon): + """Test on_stop method of Falcon device""" + assert falcon.stopped is False + falcon.stop_all.put(0) + falcon.stop() + assert falcon.stopped is True + assert falcon.stop_all.get() == 1 + + +def test_devices_falcon_stop_erase_and_wait_for_acquiring(falcon): + """ + Test _stop_erase_and_wait_for_acquiring method of Falcon device. + + This method is called by the trigger card when the Falcon needs to be reset, and + placed in a state where it can receive a trigger again + """ + # Set initial values to different states + falcon.stop_all.put(0) + falcon.erase_start.put(0) + + falcon.acquiring.put(FalconAcquiringStatus.ACQUIRING) + # If falcon status is acquiring, it should call stop_all + status = falcon._stop_erase_and_wait_for_acquiring() + assert falcon.stop_all.get() == 1 + assert falcon.erase_start.get() == 1 + # The status resolved once it sees acquiring change from DONE to ACQUIRING + assert status.done is False + falcon.acquiring.put(FalconAcquiringStatus.DONE) + falcon.acquiring.set(FalconAcquiringStatus.ACQUIRING).wait() + assert status.done is True diff --git a/tests/tests_devices/test_devices_trigger.py b/tests/tests_devices/test_devices_trigger.py new file mode 100644 index 0000000..6da34ea --- /dev/null +++ b/tests/tests_devices/test_devices_trigger.py @@ -0,0 +1,106 @@ +"""Tests for Trigger device.""" + +import threading +from unittest import mock + +import ophyd +import pytest +from bec_server.device_server.tests.utils import DMMock +from ophyd import DeviceStatus +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + +from superxas_bec.devices.trigger import ContinuousSamplingMode, Trigger + +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name + + +@pytest.fixture(scope="function") +def trigger(): + """Trigger device with mocked EPICS PVs.""" + name = "trigger" + prefix = "X10DA-ES1:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = Trigger(name=name, prefix=prefix, device_manager=DMMock()) + patch_dual_pvs(dev) + yield dev + + +@pytest.mark.parametrize(["exp_time", "cycles"], [(0.1, 1), (0.5, 2), (2, 10)]) +def test_devices_trigger_stage_core_scans(trigger, exp_time, cycles): + """Test on_connected method of Trigger device. + + The pytest.mark.parametrize decorator is used to run the test for each parameter in the list. + """ + assert trigger.prefix == "X10DA-ES1:" + assert trigger.name == "trigger" + assert trigger._pv_timeout == 1 + # + trigger.on_connected() + trigger._pv_timeout = 0.2 + trigger.start_csmpl.put(ContinuousSamplingMode.ON) + assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON + + # Set scan_info information for scan + trigger.scan_info.msg.scan_name = "step_scan" + trigger.scan_info.msg.scan_parameters["exp_time"] = exp_time + + # On stage should set exposure time + status = trigger.stage() + if isinstance(status, DeviceStatus): + status.wait() + assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF + # cycles will be multiple of exp_time/0.2 as int, minimum 1. + assert trigger.total_cycles.get() == cycles + + +def test_devices_trigger_unstage(trigger): + """ + Test on_unstage method of Trigger device. + + This should put start_csmpl to ON. + """ + trigger.start_csmpl.put(ContinuousSamplingMode.OFF) + assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF + status = trigger.unstage() + status.wait() + assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON + + +def test_devices_trigger_stop(trigger): + """ + Test on_stop method of Trigger device. + + This should stop the task_handler. + """ + assert trigger.stopped is False + with mock.patch.object(trigger, "task_handler") as mock_handler: + trigger.stop() + mock_handler.shutdown.assert_called_once() + assert trigger.stopped is True + + +def test_devices_trigger_trigger(trigger): + """Test trigger method of Trigger device.""" + # TODO we should use the ScanStatusMessage to update scan_info here + falcon = mock.MagicMock() + falcon.name.return_value = "falcon" + status = DeviceStatus(device=falcon) + status.set_finished() + falcon._stop_erase_and_wait_for_acquiring.return_value = status + trigger.device_manager.devices["falcon"] = falcon + + trigger_status = DeviceStatus(device=trigger) + trigger_status.set_finished() + with mock.patch.object( + trigger.task_handler, "submit_task", return_value=trigger_status + ) as mock_submit: + status = trigger.trigger() + assert falcon._stop_erase_and_wait_for_acquiring.call_count == 1 + assert trigger.smpl.get() == 1 # smpl called with 1 + # TODO check that the task_handler is called with the correct function + # This is currently not easily testable + assert mock_submit.call_count == 1 + assert trigger_status == status