Moved spline computation to separate file

Added tests for spline computation
Refactoring
This commit is contained in:
gac-x01da
2024-09-13 08:48:44 +02:00
parent 0062da5a6b
commit 0fbc700f1b
7 changed files with 192 additions and 98 deletions
+13 -1
View File
@@ -4,18 +4,30 @@ Debye-specific plugins and configs for BEC
## How to
### Open visual studio code
### Visual studio code
To open
```
ssh x01da-bec-001
cd /data/test/x01da-test-bec/bec_deployment
code
```
To run tests directly in vs code terminal
```
. /data/test/x01da-test-bec/bec_deployment/bec_venv/bin/activate
cd /data/test/x01da-test-bec/bec_deployment/debye_bec
pytest -vv ./tests
```
### Git
```
git pull
git push origin feat/add_advanced_scan_modes
git status
```
If git claims to not know the author identity
```
git config --global user.email "you@example.com"
git config --global user.name "gac-x01da"
```
### BEC Server
```
+67 -35
View File
@@ -1,23 +1,20 @@
""" Module for the Mo1 Bragg positioner of the Debye beamline.
The soft IOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected
The softIOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected
to a motor controller via web sockets. The Mo1 Bragg positioner is not only a
positioner, but also a scan controller to setup XAS and XRD scans. A few scan modes
are programmed in the controller, e.g. simple and advanced XAS scans + XRD triggering mode.
Note: For some of the Epics PVs, in particular action buttons, the suffix .PROC and
put_complete=True is used to ensure that the action is executed completely. This is believed
Note: For some of the Epics PVs, in particular action buttons, the put_complete=True is
used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import enum
import threading
import time
import traceback
from typeguard import typechecked
from debye_bec.devices.utils.mo1_bragg_utils import compute_spline
from dataclasses import dataclass
from typing import Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import (
@@ -31,13 +28,14 @@ from ophyd import (
Signal,
Staged,
)
from typeguard import typechecked
from ophyd.utils import LimitError
from ophyd_devices.utils import bec_scaninfo_mixin, bec_utils
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
from debye_bec.devices.utils.mo1_bragg_utils import compute_spline
logger = bec_logger.logger
class ScanControlScanStatus(int, enum.Enum):
"""Enum class for the scan status of the Bragg positioner"""
@@ -64,7 +62,7 @@ class ScanControlLoadMessage(int, enum.Enum):
ERR_SCAN_TIME = 11
ERR_SCAN_VEL_TOO_HI = 12
ERR_SCAN_ANGLE_OUT_OF_LIM = 13
ERR_SCAN_HIGH_VEL_LAR_42 = 14
ERR_SCAN_HIGH_VEL_LAR_42 = 14
ERR_SCAN_MODE_INVALID = 15
@@ -172,6 +170,8 @@ class Mo1BraggScanSettings(Device):
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
class Mo1BraggCalculator(Device):
"""Mo1 Bragg PVs to convert angle to energy or vice-versa."""
calc_reset = Cpt(EpicsSignal, suffix="calc_reset", kind="config", put_complete=True)
calc_done = Cpt(EpicsSignalRO, suffix="calc_done_RBV", kind="config")
calc_energy = Cpt(EpicsSignalWithRBV, suffix="calc_energy", kind="config")
@@ -268,7 +268,7 @@ class Mo1Bragg(Device, PositionerBase):
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
# Angle PVs
# TODO makd angle motion a pseudo motor
# TODO make angle motion a pseudo motor
feedback_pos_angle = Cpt(
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True
)
@@ -331,7 +331,8 @@ class Mo1Bragg(Device, PositionerBase):
self.scan_control.scan_progress.subscribe(self._progress_update, run=False)
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC.
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
@@ -405,7 +406,8 @@ class Mo1Bragg(Device, PositionerBase):
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
"""Method to check if a value is within limits of the positioner. Called by PositionerBase.move()
"""Method to check if a value is within limits of the positioner.
Called by PositionerBase.move()
Args:
value (float) : value to move axis to.
@@ -423,15 +425,19 @@ class Mo1Bragg(Device, PositionerBase):
status: DeviceStatus,
update_frequency: float = 0.1,
) -> None:
"""Method to be called in the move thread to move the Bragg positioner to the target position.
"""Method to be called in the move thread to move the Bragg positioner
to the target position.
Args:
target_pos (float) : target position for the motion
move_cpt (Cpt) : component to set the target position on the IOC,
either setpoint or setpoint_abs_angle depending on the move type
read_cpt (Cpt) : component to read the current position of the motion, readback or feedback_pos_angle
either setpoint or setpoint_abs_angle depending
on the move type
read_cpt (Cpt) : component to read the current position of the motion,
readback or feedback_pos_angle
status (DeviceStatus) : status object to set the status of the motion
update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s
update_frequency (float): Optional, frequency to update the current position of
the motion, defaults to 0.1s
"""
try:
# Set the target position on IOC
@@ -452,11 +458,13 @@ class Mo1Bragg(Device, PositionerBase):
status.set_exception(exc=exc)
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
"""Move the Bragg positioner to the specified value, allows to switch between move types angle and energy.
"""Move the Bragg positioner to the specified value, allows to
switch between move types angle and energy.
Args:
value (float) : target value for the motion
move_type (str | MoveType) : Optional, specify the type of move, either 'energy' or 'angle'
move_type (str | MoveType) : Optional, specify the type of move,
either 'energy' or 'angle'
Returns:
DeviceStatus : status object to track the motion
@@ -535,7 +543,11 @@ class Mo1Bragg(Device, PositionerBase):
self.scan_settings.s_scan_scantime.put(scan_time)
@typechecked
def convert_angle_energy(self, mode:Literal["AngleToEnergy", "EnergyToAngle"], inp:float) -> float:
def convert_angle_energy(
self,
mode:Literal["AngleToEnergy", "EnergyToAngle"],
inp:float
) -> float:
"""Calculate energy to angle or vice versa
Args:
@@ -545,7 +557,6 @@ class Mo1Bragg(Device, PositionerBase):
Returns:
output (float): Converted angle or energy
"""
# TODO the calc_reset should reset from the IOC itself, check EPICS implementation with Alvin/Xiaoqiang
self.calculator.calc_reset.put(0)
self.calculator.calc_reset.put(1)
@@ -576,7 +587,14 @@ class Mo1Bragg(Device, PositionerBase):
elif mode == "EnergyToAngle":
return self.calculator.calc_angle.get()
def set_advanced_xas_settings(self, low: float, high:float, scan_time: float, p_kink: float, e_kink: float) -> None:
def set_advanced_xas_settings(
self,
low: float,
high:float,
scan_time: float,
p_kink: float,
e_kink: float
) -> None:
"""Set Advanced XAS parameters for upcoming scan.
Args:
@@ -586,9 +604,9 @@ class Mo1Bragg(Device, PositionerBase):
p_kink (float): Position of kink in %
e_kink (float): Energy of kink in eV
"""
## TODO Add fallback solution for automatic testing, otherwise test will fail
## because no monochromator will calculate the angle
## Unsure how to implement this
# TODO Add fallback solution for automatic testing, otherwise test will fail
# because no monochromator will calculate the angle
# Unsure how to implement this
move_type = self.move_type.get()
if move_type == MoveType.ENERGY:
@@ -599,7 +617,13 @@ class Mo1Bragg(Device, PositionerBase):
else:
raise Mo1BraggError("MoveType Angle not implemented for advanced scans, use Energy")
pos, vel, dt = compute_spline(low_deg=low_deg, high_deg=high_deg, p_kink =p_kink, e_kink_deg = e_kink_deg, scan_time=scan_time)
pos, vel, dt = compute_spline(
low_deg=low_deg,
high_deg=high_deg,
p_kink =p_kink,
e_kink_deg = e_kink_deg,
scan_time=scan_time,
)
self.scan_settings.a_scan_pos.set(pos)
self.scan_settings.a_scan_vel.set(vel)
@@ -661,7 +685,7 @@ class Mo1Bragg(Device, PositionerBase):
def kickoff(self):
"""Kickoff the device, called from BEC."""
scan_duration = self.scan_control.scan_duration.get()
# TODO implement better logic for infinite scans, at least bring it up with Debye
#TODO implement better logic for infinite scans, at least bring it up with Debye
start_func = (
self.scan_control.scan_start_infinite.put
if scan_duration < 0.1
@@ -825,7 +849,8 @@ class Mo1Bragg(Device, PositionerBase):
def complete(self) -> DeviceStatus:
"""Complete the acquisition.
The method returns a DeviceStatus object that resolves to set_finished or set_exception once the acquisition is completed.
The method returns a DeviceStatus object that resolves to set_finished
or set_exception once the acquisition is completed.
"""
status = self.on_complete()
if isinstance(status, DeviceStatus):
@@ -898,11 +923,14 @@ class Mo1Bragg(Device, PositionerBase):
For EPICs PVs, an example usage is pasted at the bottom.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
signal_conditions (list[tuple]): tuple of executable calls for conditions
(get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked. The function relies on the self.stopped property to be set
check_stopped (bool): True if stopped flag should be checked.
The function relies on the self.stopped property to be set
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
all_signals (bool): True if all signals should be True,
False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
@@ -936,15 +964,17 @@ class Mo1Bragg(Device, PositionerBase):
exception_on_timeout: Exception = None,
) -> DeviceStatus:
"""Wrapper around wait_for_signals to be started in thread and attach a DeviceStatus object.
This allows BEC to perform actinos in parallel and not be blocked by method calls on a device.
Typically used for on_trigger, on_complete methods or also the kickoff.
This allows BEC to perform actinos in parallel and not be blocked by method
calls on a device. Typically used for on_trigger, on_complete methods or also the kickoff.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
signal_conditions (list[tuple]): tuple of executable calls for conditions
(get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
all_signals (bool): True if all signals should be True,
False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
Returns:
@@ -970,11 +1000,13 @@ class Mo1Bragg(Device, PositionerBase):
Args:
status (DeviceStatus): DeviceStatus object to be set
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
signal_conditions (list[tuple]): tuple of executable calls for
conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
all_signals (bool): True if all signals should be True, False if
any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
"""
try:
+31 -10
View File
@@ -1,19 +1,29 @@
""" Module for additional utils of the Mo1 Bragg Positioner"""
import numpy as np
from scipy.interpolate import BSpline
################ Define Constants ############
SAFETY_FACTOR = 0.025 # safety factor to limit acceleration -> NEVER SET TO ZERO !
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number, otherwise peak value will not be included
N_SAMPLES = 41 # number of samples to generate -> Always choose uneven number,
# otherwise peak value will not be included
DEGREE_SPLINE = 3 # DEGREE_SPLINE of spline, 3 works good
TIME_COMPENSATE_SPLINE = 0.0062 # time to be compensated each spline in s
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values as used on ACS controller for simple scans
POSITION_COMPONSATION = 0.02 # angle to add at both limits, must be same values
# as used on ACS controller for simple scans
class Mo1UtilsSplineError(Exception):
""" Exception for spline computation"""
def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float, scan_time:float) -> tuple[float, float, float]:
def compute_spline(
low_deg:float,
high_deg:float,
p_kink:float,
e_kink_deg:float,
scan_time:float,
) -> tuple[float, float, float]:
""" Spline computation for the advanced scan mode
Args:
@@ -32,16 +42,27 @@ def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float
high_deg = high_deg + POSITION_COMPONSATION
if p_kink < 0 or p_kink > 100:
raise Mo1UtilsSplineError(f"Kink position not within range of [0..100%] for p_kink: {p_kink}")
raise Mo1UtilsSplineError("Kink position not within range of [0..100%]"+
f"for p_kink: {p_kink}")
if e_kink_deg < low_deg or e_kink_deg > high_deg:
raise Mo1UtilsSplineError(f"Kink energy not within selected energy range of scan, for e_kink_deg {e_kink_deg}, low_deg {low_deg} and high_deg {high_deg}.")
raise Mo1UtilsSplineError("Kink energy not within selected energy range of scan,"+
f"for e_kink_deg {e_kink_deg}, low_deg {low_deg} and"+
f"high_deg {high_deg}.")
tc1 = SAFETY_FACTOR / scan_time * TIME_COMPENSATE_SPLINE
t_kink = (scan_time - TIME_COMPENSATE_SPLINE - 2*(SAFETY_FACTOR - tc1)) * p_kink/100 + (SAFETY_FACTOR - tc1)
t_input = [0, SAFETY_FACTOR - tc1, t_kink , scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1, scan_time - TIME_COMPENSATE_SPLINE ]
p_input = [0, 0 , e_kink_deg - low_deg , high_deg - low_deg , high_deg - low_deg]
t_input = [0,
SAFETY_FACTOR - tc1,
t_kink,
scan_time - TIME_COMPENSATE_SPLINE - SAFETY_FACTOR + tc1,
scan_time - TIME_COMPENSATE_SPLINE]
p_input = [0,
0,
e_kink_deg - low_deg,
high_deg - low_deg,
high_deg - low_deg]
cv = np.stack((t_input, p_input)).T # spline coefficients
max_param = len(cv) - DEGREE_SPLINE
@@ -58,10 +79,10 @@ def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float
acc = []
for item in a:
acc.append(0) if item[1] == 0 else acc.append(item[1]/item[0])
acc.append(0) if item[1] == 0 else acc.append(item[1]/item[0])
jerk = []
for item in j:
jerk.append(0) if item[1] == 0 else jerk.append(item[1]/item[0])
jerk.append(0) if item[1] == 0 else jerk.append(item[1]/item[0])
dt = np.zeros(len(tim))
for i in np.arange(len(tim)):
@@ -70,4 +91,4 @@ def compute_spline(low_deg:float, high_deg:float, p_kink:float, e_kink_deg:float
else:
dt[i] = 1000*(tim[i]-tim[i-1])
return pos, vel, dt
return pos, vel, dt
+44 -22
View File
@@ -8,6 +8,7 @@ from bec_server.scan_server.scans import AsyncFlyScanBase
class XASSimpleScan(AsyncFlyScanBase):
"""Class for the XAS simple scan"""
scan_name = "xas_simple_scan"
scan_type = "fly"
@@ -30,15 +31,17 @@ class XASSimpleScan(AsyncFlyScanBase):
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
@@ -60,7 +63,8 @@ class XASSimpleScan(AsyncFlyScanBase):
yield None
def pre_scan(self):
"""Pre Scan action. Ensure the motor movetype is set to energy, then check limits for start/end energy.
"""Pre Scan action. Ensure the motor movetype is set to energy, then check
limits for start/end energy.
#TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor.
"""
yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy")
@@ -89,7 +93,11 @@ class XASSimpleScan(AsyncFlyScanBase):
while True:
# Readout monitored devices
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary", point_id=self.point_id)
yield from self.stubs.read_and_wait(
group="primary",
wait_group="readout_primary",
point_id=self.point_id,
)
# Check if complete call on Mo1 Bragg has been finished
status = self.stubs.get_req_status(
device=self.motor, RID=self.metadata["RID"], DIID=target_diid
@@ -103,6 +111,8 @@ class XASSimpleScan(AsyncFlyScanBase):
class XASSimpleScanWithXRD(XASSimpleScan):
"""Class for the XAS simple scan with XRD"""
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
@@ -140,13 +150,16 @@ class XASSimpleScanWithXRD(XASSimpleScan):
xrd_enable_low (bool): Enable XRD triggering for the low energy range.
num_trigger_low (int): Number of triggers for the low energy range.
exp_time_low (float): Exposure time for the low energy range.
cycle_low (int): Specify how often the triggers should be considered, every nth cycle for low
cycle_low (int): Specify how often the triggers should be considered,
every nth cycle for low
xrd_enable_high (bool): Enable XRD triggering for the high energy range.
num_trigger_high (int): Number of triggers for the high energy range.
exp_time_high (float): Exposure time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
cycle_high (int): Specify how often the triggers should be considered,
every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
@@ -168,7 +181,8 @@ class XASSimpleScanWithXRD(XASSimpleScan):
self.cycle_high = cycle_high
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
scan_name = "xas_advanced_scan"
gui_config = {
"Movement Parameters": ["start", "stop"],
@@ -188,9 +202,11 @@ class XASAdvancedScan(XASSimpleScan):
**kwargs,
):
"""The xas_advanced_scan is an oscillation motion on the mono motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the exafs region of the scan.
Start and Stop define the energy range for the scan, scan_time is the
time for one scan cycle and scan_duration is the duration of the scan.
If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the
exafs region of the scan.
Args:
start (float): Start angle for the scan.
@@ -199,7 +215,8 @@ class XASAdvancedScan(XASSimpleScan):
scan_duration (float): Total duration of the scan.
p_kink (float): Position of the kink.
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
@@ -216,7 +233,8 @@ class XASAdvancedScan(XASSimpleScan):
self.e_kink = e_kink
class XASAdvancedScanWithXRD(XASAdvancedScan):
"""Class for the XAS advanced scan with XRD"""
scan_name = "xas_advanced_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
@@ -247,9 +265,10 @@ class XASAdvancedScanWithXRD(XASAdvancedScan):
):
"""The xas_advanced_scan is an oscillation motion on the mono motor
with XRD triggering at low and high energy ranges.
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
p_kink and e_kink add a kink to the motion profile to slow down in the exafs region of the scan.
Start and Stop define the energy range for the scan, scan_time is the time for
one scan cycle and scan_duration is the duration of the scan. If scan duration
is set to 0, the scan will run infinitely. p_kink and e_kink add a kink to the
motion profile to slow down in the exafs region of the scan.
Args:
start (float): Start angle for the scan.
@@ -261,12 +280,15 @@ class XASAdvancedScanWithXRD(XASAdvancedScan):
xrd_enable_low (bool): Enable XRD triggering for the low energy range.
num_trigger_low (int): Number of triggers for the low energy range.
exp_time_low (float): Exposure time for the low energy range.
cycle_low (int): Specify how often the triggers should be considered, every nth cycle for low
cycle_low (int): Specify how often the triggers should be considered,
every nth cycle for low
xrd_enable_high (bool): Enable XRD triggering for the high energy range.
num_trigger_high (int): Number of triggers for the high energy range.
exp_time_high (float): Exposure time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
cycle_high (int): Specify how often the triggers should be considered,
every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
@@ -290,4 +312,4 @@ class XASAdvancedScanWithXRD(XASAdvancedScan):
self.xrd_enable_high = xrd_enable_high
self.num_trigger_high = num_trigger_high
self.exp_time_high = exp_time_high
self.cycle_high = cycle_high
self.cycle_high = cycle_high
+5 -20
View File
@@ -57,7 +57,7 @@ def test_init(mock_bragg):
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
assert dev.move_type.get() == MoveType.ENERGY
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs.PROC"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs"
def test_check_value(mock_bragg):
@@ -149,24 +149,6 @@ def test_set_xas_settings(mock_bragg):
assert dev.scan_settings.s_scan_angle_hi.get() == 20
assert dev.scan_settings.s_scan_scantime.get() == 1
def test_set_advanced_xas_settings(mock_bragg):
dev = mock_bragg
dev.move_type.set(MoveType.ENERGY)
dev.set_advanced_xas_settings(low=10000, high=12000, scan_time=0.5)
p_prev = 0
for p in dev.scan_settings.a_scan_pos.get():
assert p > 0 and p < 40, "maximum positions of monochromator exceeded"
assert p >= p_prev, "values in a_scan_pos must be in ascending order"
p_prev = p
for v in dev.scan_settings.a_scan_vel.get():
assert v > -120 and v < 120, "maximum speed of monochromator exceeded"
t_tot
for t in dev.scan_settings.a_scan_time.get():
assert t >= 0, "time interval must be greater then 0"
t_tot = t_tot + t
assert t_tot <= 0.5, "total time must be smaller or equal to scan_time"
# TODO add test for MoveType.Angle (or delete MoveType.Angle)
def test_set_xrd_settings(mock_bragg):
dev = mock_bragg
dev.set_xrd_settings(
@@ -188,7 +170,6 @@ def test_set_xrd_settings(mock_bragg):
assert dev.scan_settings.xrd_every_n_lo.get() == 1
assert dev.scan_settings.xrd_every_n_hi.get() == 5
def test_set_control_settings(mock_bragg):
dev = mock_bragg
dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10)
@@ -505,6 +486,8 @@ def test_stage(mock_bragg, scan_worker_mock, msg):
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=False,
@@ -529,6 +512,8 @@ def test_stage(mock_bragg, scan_worker_mock, msg):
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
@@ -0,0 +1,22 @@
# pylint: skip-file
import debye_bec.devices.utils.mo1_bragg_utils as utils
import numpy as np
def test_compute_spline():
p, v, dt = utils.compute_spline(low_deg=10, high_deg=12, p_kink=50, e_kink_deg=11, scan_time=0.5)
rtol = 1e-6
atol = 1e-3
p_desired = [9.98,9.98376125,9.99479,10.01270375,10.03712,10.06765625,10.10393,10.14555875,10.19216,10.24335125,10.29875,10.35797375,10.42064,10.48636625,10.55477,10.62546875,10.69808,10.77222125,10.84751,10.92356375,11.,11.07643625,11.15249,11.22777875,11.30192,11.37453125,11.44523,11.51363375,11.57936,11.64202625,11.70125,11.75664875,11.80784,11.85444125,11.89607,11.93234375,11.96288,11.98729625,12.00521,12.01623875,12.02]
v_desired = [0.,1.50156441,2.35715667,2.90783907,3.29035796,3.57019636,3.78263174,3.9483388,4.08022441,4.18675043,4.27368333,4.34507577,4.40384627,4.45213618,4.49153736,4.52324148,4.54814006,4.5668924,4.57997194,4.58769736,4.59025246,4.58769736,4.57997194,4.5668924,4.54814006,4.52324148,4.49153736,4.45213618,4.40384627,4.34507577,4.27368333,4.18675043,4.08022441,3.9483388,3.78263174,3.57019636,3.29035796,2.90783907,2.35715667,1.50156441,0.]
dt_desired = [0.,4.34081063,5.57222438,6.73882688,7.84061813,8.87759812,9.84976688,10.75712437,11.59967063,12.37740563,13.09032937,13.73844188,14.32174313,14.84023312,15.29391188,15.68277937,16.00683562,16.26608063,16.46051438,16.59013687,16.65494813,16.65494813,16.59013687,16.46051438,16.26608063,16.00683562,15.68277938,15.29391188,14.84023312,14.32174313,13.73844187,13.09032938,12.37740562,11.59967063,10.75712437,9.84976687,8.87759813,7.84061812,6.73882688,5.57222437,4.34081063]
np.testing.assert_allclose(p, p_desired, rtol, atol)
np.testing.assert_allclose(v, v_desired, rtol, atol)
np.testing.assert_allclose(dt, dt_desired, rtol, atol)
assert(utils.SAFETY_FACTOR == 0.025)
assert(utils.N_SAMPLES == 41)
assert(utils.DEGREE_SPLINE == 3)
assert(utils.TIME_COMPENSATE_SPLINE == 0.0062)
assert(utils.POSITION_COMPONSATION == 0.02)
+10 -10
View File
@@ -106,7 +106,7 @@ def test_xas_simple_scan():
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
@@ -118,7 +118,7 @@ def test_xas_simple_scan():
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
@@ -267,7 +267,7 @@ def test_xas_simple_scan_with_xrd():
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
@@ -279,7 +279,7 @@ def test_xas_simple_scan_with_xrd():
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
@@ -361,7 +361,7 @@ def test_xas_advanced_scan():
"async": [],
},
"num_points": None,
"positions": [8000, 9000],
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan",
"scan_type": "fly",
},
@@ -408,7 +408,7 @@ def test_xas_advanced_scan():
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
@@ -420,7 +420,7 @@ def test_xas_advanced_scan():
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
@@ -516,7 +516,7 @@ def test_xas_advanced_scan_with_xrd():
"async": [],
},
"num_points": None,
"positions": [8000, 9000],
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan_with_xrd",
"scan_type": "fly",
},
@@ -563,7 +563,7 @@ def test_xas_advanced_scan_with_xrd():
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id", "point_id": 1},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
@@ -575,7 +575,7 @@ def test_xas_advanced_scan_with_xrd():
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"},
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id", "point_id": 1},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},