diff --git a/README.md b/README.md index ee37dc1..7795fdc 100644 --- a/README.md +++ b/README.md @@ -4,18 +4,30 @@ Debye-specific plugins and configs for BEC ## How to -### Open visual studio code +### Visual studio code +To open ``` ssh x01da-bec-001 cd /data/test/x01da-test-bec/bec_deployment code ``` +To run tests directly in vs code terminal +``` +. /data/test/x01da-test-bec/bec_deployment/bec_venv/bin/activate +cd /data/test/x01da-test-bec/bec_deployment/debye_bec +pytest -vv ./tests +``` ### Git ``` git pull git push origin feat/add_advanced_scan_modes git status ``` +If git claims to not know the author identity +``` +git config --global user.email "you@example.com" +git config --global user.name "gac-x01da" +``` ### BEC Server ``` diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index 1855701..9694652 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -1,23 +1,20 @@ """ Module for the Mo1 Bragg positioner of the Debye beamline. -The soft IOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected +The softIOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected to a motor controller via web sockets. The Mo1 Bragg positioner is not only a positioner, but also a scan controller to setup XAS and XRD scans. A few scan modes are programmed in the controller, e.g. simple and advanced XAS scans + XRD triggering mode. -Note: For some of the Epics PVs, in particular action buttons, the suffix .PROC and -put_complete=True is used to ensure that the action is executed completely. This is believed +Note: For some of the Epics PVs, in particular action buttons, the put_complete=True is +used to ensure that the action is executed completely. This is believed to allow for a more stable execution of the action.""" import enum import threading import time import traceback -from typeguard import typechecked -from debye_bec.devices.utils.mo1_bragg_utils import compute_spline from dataclasses import dataclass from typing import Literal - from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import ( @@ -31,13 +28,14 @@ from ophyd import ( Signal, Staged, ) +from typeguard import typechecked from ophyd.utils import LimitError from ophyd_devices.utils import bec_scaninfo_mixin, bec_utils from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError +from debye_bec.devices.utils.mo1_bragg_utils import compute_spline logger = bec_logger.logger - class ScanControlScanStatus(int, enum.Enum): """Enum class for the scan status of the Bragg positioner""" @@ -64,7 +62,7 @@ class ScanControlLoadMessage(int, enum.Enum): ERR_SCAN_TIME = 11 ERR_SCAN_VEL_TOO_HI = 12 ERR_SCAN_ANGLE_OUT_OF_LIM = 13 - ERR_SCAN_HIGH_VEL_LAR_42 = 14 + ERR_SCAN_HIGH_VEL_LAR_42 = 14 ERR_SCAN_MODE_INVALID = 15 @@ -172,6 +170,8 @@ class Mo1BraggScanSettings(Device): a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True) class Mo1BraggCalculator(Device): + """Mo1 Bragg PVs to convert angle to energy or vice-versa.""" + calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True) calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config") calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config") @@ -268,7 +268,7 @@ class Mo1Bragg(Device, PositionerBase): velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True) # Angle PVs - # TODO makd angle motion a pseudo motor + # TODO make angle motion a pseudo motor feedback_pos_angle = Cpt( EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True ) @@ -331,7 +331,8 @@ class Mo1Bragg(Device, PositionerBase): self.scan_control.scan_progress.subscribe(self._progress_update, run=False) def _progress_update(self, value, **kwargs) -> None: - """Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC. + """Callback method to update the scan progress, runs a callback + to SUB_PROGRESS subscribers, i.e. BEC. Args: value (int) : current progress value @@ -405,7 +406,8 @@ class Mo1Bragg(Device, PositionerBase): # pylint: disable=arguments-differ def check_value(self, value: float) -> None: - """Method to check if a value is within limits of the positioner. Called by PositionerBase.move() + """Method to check if a value is within limits of the positioner. + Called by PositionerBase.move() Args: value (float) : value to move axis to. @@ -423,15 +425,19 @@ class Mo1Bragg(Device, PositionerBase): status: DeviceStatus, update_frequency: float = 0.1, ) -> None: - """Method to be called in the move thread to move the Bragg positioner to the target position. + """Method to be called in the move thread to move the Bragg positioner + to the target position. Args: target_pos (float) : target position for the motion move_cpt (Cpt) : component to set the target position on the IOC, - either setpoint or setpoint_abs_angle depending on the move type - read_cpt (Cpt) : component to read the current position of the motion, readback or feedback_pos_angle + either setpoint or setpoint_abs_angle depending + on the move type + read_cpt (Cpt) : component to read the current position of the motion, + readback or feedback_pos_angle status (DeviceStatus) : status object to set the status of the motion - update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s + update_frequency (float): Optional, frequency to update the current position of + the motion, defaults to 0.1s """ try: # Set the target position on IOC @@ -452,11 +458,13 @@ class Mo1Bragg(Device, PositionerBase): status.set_exception(exc=exc) def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus: - """Move the Bragg positioner to the specified value, allows to switch between move types angle and energy. + """Move the Bragg positioner to the specified value, allows to + switch between move types angle and energy. Args: value (float) : target value for the motion - move_type (str | MoveType) : Optional, specify the type of move, either 'energy' or 'angle' + move_type (str | MoveType) : Optional, specify the type of move, + either 'energy' or 'angle' Returns: DeviceStatus : status object to track the motion @@ -535,7 +543,11 @@ class Mo1Bragg(Device, PositionerBase): self.scan_settings.s_scan_scantime.put(scan_time) @typechecked - def convert_angle_energy(self, mode:Literal["AngleToEnergy", "EnergyToAngle"], inp:float) -> float: + def convert_angle_energy( + self, + mode:Literal["AngleToEnergy", "EnergyToAngle"], + inp:float + ) -> float: """Calculate energy to angle or vice versa Args: @@ -545,7 +557,6 @@ class Mo1Bragg(Device, PositionerBase): Returns: output (float): Converted angle or energy """ - # TODO the calc_reset should reset from the IOC itself, check EPICS implementation with Alvin/Xiaoqiang self.calculator.calc_reset.put(0) self.calculator.calc_reset.put(1) @@ -576,7 +587,14 @@ class Mo1Bragg(Device, PositionerBase): elif mode == "EnergyToAngle": return self.calculator.calc_angle.get() - def set_advanced_xas_settings(self, low: float, high:float, scan_time: float, p_kink: float, e_kink: float) -> None: + def set_advanced_xas_settings( + self, + low: float, + high:float, + scan_time: float, + p_kink: float, + e_kink: float + ) -> None: """Set Advanced XAS parameters for upcoming scan. Args: @@ -586,9 +604,9 @@ class Mo1Bragg(Device, PositionerBase): p_kink (float): Position of kink in % e_kink (float): Energy of kink in eV """ - ## TODO Add fallback solution for automatic testing, otherwise test will fail - ## because no monochromator will calculate the angle - ## Unsure how to implement this + # TODO Add fallback solution for automatic testing, otherwise test will fail + # because no monochromator will calculate the angle + # Unsure how to implement this move_type = self.move_type.get() if move_type == MoveType.ENERGY: @@ -599,7 +617,13 @@ class Mo1Bragg(Device, PositionerBase): else: raise Mo1BraggError("MoveType Angle not implemented for advanced scans, use Energy") - pos, vel, dt = compute_spline(low_deg=low_deg, high_deg=high_deg, p_kink =p_kink, e_kink_deg = e_kink_deg, scan_time=scan_time) + pos, vel, dt = compute_spline( + low_deg=low_deg, + high_deg=high_deg, + p_kink =p_kink, + e_kink_deg = e_kink_deg, + scan_time=scan_time, + ) self.scan_settings.a_scan_pos.set(pos) self.scan_settings.a_scan_vel.set(vel) @@ -661,7 +685,7 @@ class Mo1Bragg(Device, PositionerBase): def kickoff(self): """Kickoff the device, called from BEC.""" scan_duration = self.scan_control.scan_duration.get() - # TODO implement better logic for infinite scans, at least bring it up with Debye + #TODO implement better logic for infinite scans, at least bring it up with Debye start_func = ( self.scan_control.scan_start_infinite.put if scan_duration < 0.1 @@ -825,7 +849,8 @@ class Mo1Bragg(Device, PositionerBase): def complete(self) -> DeviceStatus: """Complete the acquisition. - The method returns a DeviceStatus object that resolves to set_finished or set_exception once the acquisition is completed. + The method returns a DeviceStatus object that resolves to set_finished + or set_exception once the acquisition is completed. """ status = self.on_complete() if isinstance(status, DeviceStatus): @@ -898,11 +923,14 @@ class Mo1Bragg(Device, PositionerBase): For EPICs PVs, an example usage is pasted at the bottom. Args: - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + signal_conditions (list[tuple]): tuple of executable calls for conditions + (get_current_state, condition) to check timeout (float): timeout in seconds - check_stopped (bool): True if stopped flag should be checked. The function relies on the self.stopped property to be set + check_stopped (bool): True if stopped flag should be checked. + The function relies on the self.stopped property to be set interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True + all_signals (bool): True if all signals should be True, + False if any signal should be True Returns: bool: True if all signals are in the desired state, False if timeout is reached @@ -936,15 +964,17 @@ class Mo1Bragg(Device, PositionerBase): exception_on_timeout: Exception = None, ) -> DeviceStatus: """Wrapper around wait_for_signals to be started in thread and attach a DeviceStatus object. - This allows BEC to perform actinos in parallel and not be blocked by method calls on a device. - Typically used for on_trigger, on_complete methods or also the kickoff. + This allows BEC to perform actinos in parallel and not be blocked by method + calls on a device. Typically used for on_trigger, on_complete methods or also the kickoff. Args: - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + signal_conditions (list[tuple]): tuple of executable calls for conditions + (get_current_state, condition) to check timeout (float): timeout in seconds check_stopped (bool): True if stopped flag should be checked interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True + all_signals (bool): True if all signals should be True, + False if any signal should be True exception_on_timeout (Exception): Exception to raise on timeout Returns: @@ -970,11 +1000,13 @@ class Mo1Bragg(Device, PositionerBase): Args: status (DeviceStatus): DeviceStatus object to be set - signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + signal_conditions (list[tuple]): tuple of executable calls for + conditions (get_current_state, condition) to check timeout (float): timeout in seconds check_stopped (bool): True if stopped flag should be checked interval (float): interval in seconds - all_signals (bool): True if all signals should be True, False if any signal should be True + all_signals (bool): True if all signals should be True, False if + any signal should be True exception_on_timeout (Exception): Exception to raise on timeout """ try: diff --git a/debye_bec/devices/utils/mo1_bragg_utils.py b/debye_bec/devices/utils/mo1_bragg_utils.py index 5f8deb9..8ec5ae8 100644 --- a/debye_bec/devices/utils/mo1_bragg_utils.py +++ b/debye_bec/devices/utils/mo1_bragg_utils.py @@ -1,19 +1,29 @@ +""" Module for additional utils of the Mo1 Bragg Positioner""" + import numpy as np from scipy.interpolate import BSpline ################ Define Constants ############ SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO ! -N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number, otherwise peak value will not be included +N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number, + # otherwise peak value will not be included DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s -POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values as used on ACS controller for simple scans +POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values + # as used on ACS controller for simple scans class Mo1UtilsSplineError(Exception): """ Exception for spline computation""" -def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float, scan_time:float) -> tuple[float, float, float]: +def compute_spline( + low_deg:float, + high_deg:float, + p_kink:float, + e_kink_deg:float, + scan_time:float, +) -> tuple[float, float, float]: """ Spline computation for the advanced scan mode Args: @@ -32,16 +42,27 @@ def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float high_deg = high_deg + POSITION_COMPONSATION if p_kink < 0 or p_kink > 100: - raise Mo1UtilsSplineError(f"Kink position not within range of [0..100%] for p_kink: {p_kink}") + raise Mo1UtilsSplineError("Kink position not within range of [0..100%]"+ + f"for p_kink: {p_kink}") if e_kink_deg < low_deg or e_kink_deg > high_deg: - raise Mo1UtilsSplineError(f"Kink energy not within selected energy range of scan, for e_kink_deg {e_kink_deg}, low_deg {low_deg} and high_deg {high_deg}.") + raise Mo1UtilsSplineError("Kink energy not within selected energy range of scan,"+ + f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"+ + f"high_deg {high_deg}.") tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2*(SAFETY_FACTOR - tc1)) * p_kink/100 + (SAFETY_FACTOR - tc1) - t_input = [0, SAFETY_FACTOR - tc1, t_kink , scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1, scan_time - TIME_COMPENSATE_SPLINE ] - p_input = [0, 0 , e_kink_deg - low_deg , high_deg - low_deg , high_deg - low_deg] + t_input = [0, + SAFETY_FACTOR - tc1, + t_kink, + scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1, + scan_time - TIME_COMPENSATE_SPLINE] + p_input = [0, + 0, + e_kink_deg - low_deg, + high_deg - low_deg, + high_deg - low_deg] cv = np.stack((t_input, p_input)).T # spline coefficients max_param = len(cv) - DEGREE_SPLINE @@ -58,10 +79,10 @@ def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float acc = [] for item in a: - acc.append(0) if item[1] == 0 else acc.append(item[1]/item[0]) + acc.append(0) if item[1] == 0 else acc.append(item[1]/item[0]) jerk = [] for item in j: - jerk.append(0) if item[1] == 0 else jerk.append(item[1]/item[0]) + jerk.append(0) if item[1] == 0 else jerk.append(item[1]/item[0]) dt = np.zeros(len(tim)) for i in np.arange(len(tim)): @@ -70,4 +91,4 @@ def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float else: dt[i] = 1000*(tim[i]-tim[i-1]) - return pos, vel, dt \ No newline at end of file + return pos, vel, dt diff --git a/debye_bec/scans/mono_bragg_scans.py b/debye_bec/scans/mono_bragg_scans.py index b2bf87e..856abe0 100644 --- a/debye_bec/scans/mono_bragg_scans.py +++ b/debye_bec/scans/mono_bragg_scans.py @@ -8,6 +8,7 @@ from bec_server.scan_server.scans import AsyncFlyScanBase class XASSimpleScan(AsyncFlyScanBase): + """Class for the XAS simple scan""" scan_name = "xas_simple_scan" scan_type = "fly" @@ -30,15 +31,17 @@ class XASSimpleScan(AsyncFlyScanBase): **kwargs, ): """The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor. - Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration - is the duration of the scan. If scan duration is set to 0, the scan will run infinitely. + Start and Stop define the energy range for the scan, scan_time is the time for one scan + cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the + scan will run infinitely. Args: start (float): Start energy for the scan. stop (float): Stop energy for the scan. scan_time (float): Time for one scan cycle. scan_duration (float): Duration of the scan. - motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg". + motor (DeviceBase, optional): Motor device to be used for the scan. + Defaults to "mo1_bragg". Examples: >>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10) """ @@ -60,7 +63,8 @@ class XASSimpleScan(AsyncFlyScanBase): yield None def pre_scan(self): - """Pre Scan action. Ensure the motor movetype is set to energy, then check limits for start/end energy. + """Pre Scan action. Ensure the motor movetype is set to energy, then check + limits for start/end energy. #TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor. """ yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy") @@ -89,7 +93,11 @@ class XASSimpleScan(AsyncFlyScanBase): while True: # Readout monitored devices - yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", point_id=self.point_id) + yield from self.stubs.read_and_wait( + group="primary", + wait_group="readout_primary", + point_id=self.point_id, + ) # Check if complete call on Mo1 Bragg has been finished status = self.stubs.get_req_status( device=self.motor, RID=self.metadata["RID"], DIID=target_diid @@ -103,6 +111,8 @@ class XASSimpleScan(AsyncFlyScanBase): class XASSimpleScanWithXRD(XASSimpleScan): + """Class for the XAS simple scan with XRD""" + scan_name = "xas_simple_scan_with_xrd" gui_config = { "Movement Parameters": ["start", "stop"], @@ -140,13 +150,16 @@ class XASSimpleScanWithXRD(XASSimpleScan): xrd_enable_low (bool): Enable XRD triggering for the low energy range. num_trigger_low (int): Number of triggers for the low energy range. exp_time_low (float): Exposure time for the low energy range. - cycle_low (int): Specify how often the triggers should be considered, every nth cycle for low + cycle_low (int): Specify how often the triggers should be considered, + every nth cycle for low xrd_enable_high (bool): Enable XRD triggering for the high energy range. num_trigger_high (int): Number of triggers for the high energy range. exp_time_high (float): Exposure time for the high energy range. - cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high - motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg". - + cycle_high (int): Specify how often the triggers should be considered, + every nth cycle for high + motor (DeviceBase, optional): Motor device to be used for the scan. + Defaults to "mo1_bragg". + Examples: >>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000) """ @@ -168,7 +181,8 @@ class XASSimpleScanWithXRD(XASSimpleScan): self.cycle_high = cycle_high class XASAdvancedScan(XASSimpleScan): - + """Class for the XAS advanced scan""" + scan_name = "xas_advanced_scan" gui_config = { "Movement Parameters": ["start", "stop"], @@ -188,9 +202,11 @@ class XASAdvancedScan(XASSimpleScan): **kwargs, ): """The xas_advanced_scan is an oscillation motion on the mono motor. - Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration - is the duration of the scan. If scan duration is set to 0, the scan will run infinitely. - p_kink and e_kink add a kink to the motion profile to slow down in the exafs region of the scan. + Start and Stop define the energy range for the scan, scan_time is the + time for one scan cycle and scan_duration is the duration of the scan. + If scan duration is set to 0, the scan will run infinitely. + p_kink and e_kink add a kink to the motion profile to slow down in the + exafs region of the scan. Args: start (float): Start angle for the scan. @@ -199,7 +215,8 @@ class XASAdvancedScan(XASSimpleScan): scan_duration (float): Total duration of the scan. p_kink (float): Position of the kink. e_kink (float): Energy of the kink. - motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg". + motor (DeviceBase, optional): Motor device to be used for the scan. + Defaults to "mo1_bragg". Examples: >>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500) @@ -216,7 +233,8 @@ class XASAdvancedScan(XASSimpleScan): self.e_kink = e_kink class XASAdvancedScanWithXRD(XASAdvancedScan): - + """Class for the XAS advanced scan with XRD""" + scan_name = "xas_advanced_scan_with_xrd" gui_config = { "Movement Parameters": ["start", "stop"], @@ -247,9 +265,10 @@ class XASAdvancedScanWithXRD(XASAdvancedScan): ): """The xas_advanced_scan is an oscillation motion on the mono motor with XRD triggering at low and high energy ranges. - Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration - is the duration of the scan. If scan duration is set to 0, the scan will run infinitely. - p_kink and e_kink add a kink to the motion profile to slow down in the exafs region of the scan. + Start and Stop define the energy range for the scan, scan_time is the time for + one scan cycle and scan_duration is the duration of the scan. If scan duration + is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the + motion profile to slow down in the exafs region of the scan. Args: start (float): Start angle for the scan. @@ -261,12 +280,15 @@ class XASAdvancedScanWithXRD(XASAdvancedScan): xrd_enable_low (bool): Enable XRD triggering for the low energy range. num_trigger_low (int): Number of triggers for the low energy range. exp_time_low (float): Exposure time for the low energy range. - cycle_low (int): Specify how often the triggers should be considered, every nth cycle for low + cycle_low (int): Specify how often the triggers should be considered, + every nth cycle for low xrd_enable_high (bool): Enable XRD triggering for the high energy range. num_trigger_high (int): Number of triggers for the high energy range. exp_time_high (float): Exposure time for the high energy range. - cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high - motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg". + cycle_high (int): Specify how often the triggers should be considered, + every nth cycle for high + motor (DeviceBase, optional): Motor device to be used for the scan. + Defaults to "mo1_bragg". Examples: >>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000) @@ -290,4 +312,4 @@ class XASAdvancedScanWithXRD(XASAdvancedScan): self.xrd_enable_high = xrd_enable_high self.num_trigger_high = num_trigger_high self.exp_time_high = exp_time_high - self.cycle_high = cycle_high \ No newline at end of file + self.cycle_high = cycle_high diff --git a/tests/tests_devices/test_mo1_bragg.py b/tests/tests_devices/test_mo1_bragg.py index 25d5f74..90f11f9 100644 --- a/tests/tests_devices/test_mo1_bragg.py +++ b/tests/tests_devices/test_mo1_bragg.py @@ -57,7 +57,7 @@ def test_init(mock_bragg): assert dev.prefix == "X01DA-OP-MO1:BRAGG:" assert dev.move_type.get() == MoveType.ENERGY assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV" - assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs.PROC" + assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs" def test_check_value(mock_bragg): @@ -149,24 +149,6 @@ def test_set_xas_settings(mock_bragg): assert dev.scan_settings.s_scan_angle_hi.get() == 20 assert dev.scan_settings.s_scan_scantime.get() == 1 -def test_set_advanced_xas_settings(mock_bragg): - dev = mock_bragg - dev.move_type.set(MoveType.ENERGY) - dev.set_advanced_xas_settings(low=10000, high=12000, scan_time=0.5) - p_prev = 0 - for p in dev.scan_settings.a_scan_pos.get(): - assert p > 0 and p < 40, "maximum positions of monochromator exceeded" - assert p >= p_prev, "values in a_scan_pos must be in ascending order" - p_prev = p - for v in dev.scan_settings.a_scan_vel.get(): - assert v > -120 and v < 120, "maximum speed of monochromator exceeded" - t_tot - for t in dev.scan_settings.a_scan_time.get(): - assert t >= 0, "time interval must be greater then 0" - t_tot = t_tot + t - assert t_tot <= 0.5, "total time must be smaller or equal to scan_time" - # TODO add test for MoveType.Angle (or delete MoveType.Angle) - def test_set_xrd_settings(mock_bragg): dev = mock_bragg dev.set_xrd_settings( @@ -188,7 +170,6 @@ def test_set_xrd_settings(mock_bragg): assert dev.scan_settings.xrd_every_n_lo.get() == 1 assert dev.scan_settings.xrd_every_n_hi.get() == 5 - def test_set_control_settings(mock_bragg): dev = mock_bragg dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10) @@ -505,6 +486,8 @@ def test_stage(mock_bragg, scan_worker_mock, msg): low=scan_status_msg.content["info"]["kwargs"]["start"], high=scan_status_msg.content["info"]["kwargs"]["stop"], scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], + p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"], + e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"], ) assert mock_xrd_settings.call_args == mock.call( enable_low=False, @@ -529,6 +512,8 @@ def test_stage(mock_bragg, scan_worker_mock, msg): low=scan_status_msg.content["info"]["kwargs"]["start"], high=scan_status_msg.content["info"]["kwargs"]["stop"], scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"], + p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"], + e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"], ) assert mock_xrd_settings.call_args == mock.call( enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"], diff --git a/tests/tests_devices/test_mo1_bragg_utils.py b/tests/tests_devices/test_mo1_bragg_utils.py new file mode 100644 index 0000000..0c5ae56 --- /dev/null +++ b/tests/tests_devices/test_mo1_bragg_utils.py @@ -0,0 +1,22 @@ +# pylint: skip-file +import debye_bec.devices.utils.mo1_bragg_utils as utils +import numpy as np + +def test_compute_spline(): + p, v, dt = utils.compute_spline(low_deg=10, high_deg=12, p_kink=50, e_kink_deg=11, scan_time=0.5) + + rtol = 1e-6 + atol = 1e-3 + p_desired = [9.98,9.98376125,9.99479,10.01270375,10.03712,10.06765625,10.10393,10.14555875,10.19216,10.24335125,10.29875,10.35797375,10.42064,10.48636625,10.55477,10.62546875,10.69808,10.77222125,10.84751,10.92356375,11.,11.07643625,11.15249,11.22777875,11.30192,11.37453125,11.44523,11.51363375,11.57936,11.64202625,11.70125,11.75664875,11.80784,11.85444125,11.89607,11.93234375,11.96288,11.98729625,12.00521,12.01623875,12.02] + v_desired = [0.,1.50156441,2.35715667,2.90783907,3.29035796,3.57019636,3.78263174,3.9483388,4.08022441,4.18675043,4.27368333,4.34507577,4.40384627,4.45213618,4.49153736,4.52324148,4.54814006,4.5668924,4.57997194,4.58769736,4.59025246,4.58769736,4.57997194,4.5668924,4.54814006,4.52324148,4.49153736,4.45213618,4.40384627,4.34507577,4.27368333,4.18675043,4.08022441,3.9483388,3.78263174,3.57019636,3.29035796,2.90783907,2.35715667,1.50156441,0.] + dt_desired = [0.,4.34081063,5.57222438,6.73882688,7.84061813,8.87759812,9.84976688,10.75712437,11.59967063,12.37740563,13.09032937,13.73844188,14.32174313,14.84023312,15.29391188,15.68277937,16.00683562,16.26608063,16.46051438,16.59013687,16.65494813,16.65494813,16.59013687,16.46051438,16.26608063,16.00683562,15.68277938,15.29391188,14.84023312,14.32174313,13.73844187,13.09032938,12.37740562,11.59967063,10.75712437,9.84976687,8.87759813,7.84061812,6.73882688,5.57222437,4.34081063] + + np.testing.assert_allclose(p, p_desired, rtol, atol) + np.testing.assert_allclose(v, v_desired, rtol, atol) + np.testing.assert_allclose(dt, dt_desired, rtol, atol) + + assert(utils.SAFETY_FACTOR == 0.025) + assert(utils.N_SAMPLES == 41) + assert(utils.DEGREE_SPLINE == 3) + assert(utils.TIME_COMPENSATE_SPLINE == 0.0062) + assert(utils.POSITION_COMPONSATION == 0.02) \ No newline at end of file diff --git a/tests/tests_scans/test_mono_bragg_scans.py b/tests/tests_scans/test_mono_bragg_scans.py index 3ed6464..fa94b51 100644 --- a/tests/tests_scans/test_mono_bragg_scans.py +++ b/tests/tests_scans/test_mono_bragg_scans.py @@ -106,7 +106,7 @@ def test_xas_simple_scan(): parameter={}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"}, @@ -118,7 +118,7 @@ def test_xas_simple_scan(): parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"}, @@ -267,7 +267,7 @@ def test_xas_simple_scan_with_xrd(): parameter={}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"}, @@ -279,7 +279,7 @@ def test_xas_simple_scan_with_xrd(): parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"}, @@ -361,7 +361,7 @@ def test_xas_advanced_scan(): "async": [], }, "num_points": None, - "positions": [8000, 9000], + "positions": [8000.0, 9000.0], "scan_name": "xas_advanced_scan", "scan_type": "fly", }, @@ -408,7 +408,7 @@ def test_xas_advanced_scan(): parameter={}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id", "point_id": 0}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"}, @@ -420,7 +420,7 @@ def test_xas_advanced_scan(): parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id", "point_id": 0}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"}, @@ -516,7 +516,7 @@ def test_xas_advanced_scan_with_xrd(): "async": [], }, "num_points": None, - "positions": [8000, 9000], + "positions": [8000.0, 9000.0], "scan_name": "xas_advanced_scan_with_xrd", "scan_type": "fly", }, @@ -563,7 +563,7 @@ def test_xas_advanced_scan_with_xrd(): parameter={}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id", "point_id": 1}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"}, @@ -575,7 +575,7 @@ def test_xas_advanced_scan_with_xrd(): parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"}, ), DeviceInstructionMessage( - metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"}, + metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id", "point_id": 1}, device=None, action="read", parameter={"group": "primary", "wait_group": "readout_primary"},