Files
debye_bec/tests/tests_devices/test_mo1_bragg.py

358 lines
13 KiB
Python

# pylint: skip-file
import os
import threading
import time
from dataclasses import fields
from unittest import mock
import ophyd
import pytest
from bec_lib.messages import ScanStatusMessage
from ophyd.utils import LimitError
from ophyd_devices.tests.utils import MockPV
# from bec_server.device_server.tests.utils import DMMock
from debye_bec.devices.mo1_bragg import (
Mo1Bragg,
Mo1BraggError,
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
)
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def mock_bragg():
name = "bragg"
prefix = "X01DA-OP-MO1:BRAGG:"
with (
mock.patch.object(ophyd, "cl") as mock_cl,
mock.patch("debye_bec.devices.mo1_bragg.Mo1Bragg", "_on_init"),
):
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Mo1Bragg(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_bragg):
dev = mock_bragg
assert dev.name == "bragg"
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
assert dev.move_type.get() == MoveType.ENERGY
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs.PROC"
def test_check_value(mock_bragg):
dev = mock_bragg
dev.low_lim._read_pv.mock_data = 0
dev.high_lim._read_pv.mock_data = 1
dev.low_limit_angle._read_pv.mock_data = 10
dev.high_limit_angle._read_pv.mock_data = 20
# Check that limits are taken correctly from angle or energy
# Energy first
move_type = MoveType.ENERGY
dev.move_type.set(move_type)
# nothing happens
dev.check_value(0.5)
with pytest.raises(LimitError):
dev.check_value(15)
# Angle next
move_type = MoveType.ANGLE
dev.move_type.set(move_type)
dev.check_value(15)
with pytest.raises(LimitError):
dev.check_value(0.5)
def test_egu(mock_bragg):
dev = mock_bragg
assert dev.egu == "eV"
dev.move_type.set(MoveType.ANGLE)
assert dev.egu == "deg"
def test_move_succeeds(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
# Move succeeds
with mock.patch.object(dev.motor_is_moving._read_pv, "mock_data", side_effect=[0, 1]):
status = dev.move(0.5)
# Sleep needed to allow thread to resolive in _move_and_finish, i.e. and the 0.25s sleep inside the function
time.sleep(1)
assert status.done is True
assert status.success is True
assert dev.setpoint.get() == 0.5
assert dev.move_abs.get() == 1
def test_stop_move(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
dev.motor_is_moving._read_pv.mock_data = 0
# Move fails
status = dev.move(0.5)
time.sleep(1)
assert status.done is False
assert dev._stopped == False
dev.stop()
time.sleep(0.5)
assert dev._stopped == True
assert status.done is True
assert status.success is False
def test_set_xtal(mock_bragg):
dev = mock_bragg
dev.set_xtal("111")
# Default values for mock
assert dev.crystal.offset_si111.get() == 0
assert dev.crystal.offset_si311.get() == 0
assert dev.crystal.d_spacing_si111.get() == 0
assert dev.crystal.d_spacing_si311.get() == 0
assert dev.crystal.xtal_enum.get() == 0
dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4)
assert dev.crystal.offset_si111.get() == 1
assert dev.crystal.offset_si311.get() == 2
assert dev.crystal.d_spacing_si111.get() == 3
assert dev.crystal.d_spacing_si311.get() == 4
assert dev.crystal.xtal_enum.get() == 1
def test_set_xas_settings(mock_bragg):
dev = mock_bragg
dev.move_type.set(MoveType.ENERGY)
dev.set_xas_settings(low=0.5, high=1, scan_time=0.1)
assert dev.scan_settings.s_scan_energy_lo.get() == 0.5
assert dev.scan_settings.s_scan_energy_hi.get() == 1
assert dev.scan_settings.s_scan_scantime.get() == 0.1
dev.move_type.set(MoveType.ANGLE)
dev.set_xas_settings(low=10, high=20, scan_time=1)
assert dev.scan_settings.s_scan_angle_lo.get() == 10
assert dev.scan_settings.s_scan_angle_hi.get() == 20
assert dev.scan_settings.s_scan_scantime.get() == 1
def test_set_xrd_settings(mock_bragg):
dev = mock_bragg
dev.set_xrd_settings(
enable_low=True,
enable_high=False,
num_trigger_low=1,
num_trigger_high=7,
exp_time_low=1,
exp_time_high=3,
cycle_low=1,
cycle_high=5,
)
assert dev.scan_settings.xrd_enable_lo_enum.get() == True
assert dev.scan_settings.xrd_enable_hi_enum.get() == False
assert dev.scan_settings.xrd_n_trigger_lo.get() == 1
assert dev.scan_settings.xrd_n_trigger_hi.get() == 7
assert dev.scan_settings.xrd_time_lo.get() == 1
assert dev.scan_settings.xrd_time_hi.get() == 3
assert dev.scan_settings.xrd_every_n_lo.get() == 1
assert dev.scan_settings.xrd_every_n_hi.get() == 5
def test_set_control_settings(mock_bragg):
dev = mock_bragg
dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE
assert dev.scan_control.scan_duration.get() == 10
dev.set_scan_control_settings(mode=ScanControlMode.ADVANCED, scan_duration=5)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.ADVANCED
assert dev.scan_control.scan_duration.get() == 5
def test_update_scan_parameters(mock_bragg):
dev = mock_bragg
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
info={
"args": {
"start": 0,
"end": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
}
},
metadata={},
)
mock_bragg.scaninfo.scan_msg = msg
for field in fields(dev.scan_parameter):
assert getattr(dev.scan_parameter, field.name) == None
dev._update_scan_parameter()
for field in fields(dev.scan_parameter):
assert getattr(dev.scan_parameter, field.name) == msg.content["info"]["args"].get(
field.name, None
)
def test_kickoff_scan(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY
dev.scan_control.scan_duration._read_pv.mock_data = 1
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
assert status.done is False
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
time.sleep(0.2)
assert status.done is True
assert dev.scan_control.scan_start_timer.get() == 1
dev.scan_control.scan_duration._read_pv.mock_data = 0
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
dev.kickoff()
assert dev.scan_control.scan_start_infinite.get() == 1
def test_complete(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_done._read_pv.mock_data = 0
# Normal case
status = dev.complete()
assert status.done is False
assert status.success is False
dev.scan_control.scan_done._read_pv.mock_data = 1
time.sleep(0.2)
assert status.done is True
assert status.success is True
# Stop called case
dev.scan_control.scan_done._read_pv.mock_data = 0
status = dev.complete()
assert status.done is False
assert status.success is False
dev.stop()
time.sleep(0.2)
assert status.done is True
assert status.success is False
def test_unstage(mock_bragg):
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
mock_bragg.unstage()
assert mock_bragg.scan_control.scan_val_reset.get() == 1
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
mock_bragg.unstage()
assert mock_bragg.scan_control.scan_val_reset.get() == 0
def test_stage(mock_bragg):
# Test unknown scan type first
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
info={
"args": {
"start": 0,
"end": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
}
},
metadata={},
)
mock_bragg.scaninfo.scan_msg = msg
mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "unknown_fly_scan"})
with (
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
):
with pytest.raises(Mo1BraggError):
mock_bragg.stage()
assert mock_check_scan_msg.call_count == 1
assert mock_load_scan_metadata.call_count == 1
# Test simple XAS scan
mock_bragg.scaninfo.scan_msg.content["info"].update({"scan_name": "xas_simple_scan"})
with (
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
):
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=msg.content["info"]["args"]["start"],
high=msg.content["info"]["args"]["end"],
scan_time=msg.content["info"]["args"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=msg.content["info"]["args"]["scan_duration"],
)
mock_bragg.scaninfo.scan_msg.content["info"].update(
{"scan_name": "xas_simple_scan_with_xrd"}
)
# Unstage mock_bragg to reset _staged
mock_bragg.unstage()
time.sleep(0.1)
# Test simple XAS scan with XRD
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=msg.content["info"]["args"]["start"],
high=msg.content["info"]["args"]["end"],
scan_time=msg.content["info"]["args"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=msg.content["info"]["args"]["xrd_enable_low"],
enable_high=msg.content["info"]["args"]["xrd_enable_high"],
num_trigger_low=msg.content["info"]["args"]["num_trigger_low"],
num_trigger_high=msg.content["info"]["args"]["num_trigger_high"],
exp_time_low=msg.content["info"]["args"]["exp_time_low"],
exp_time_high=msg.content["info"]["args"]["exp_time_high"],
cycle_low=msg.content["info"]["args"]["cycle_low"],
cycle_high=msg.content["info"]["args"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=msg.content["info"]["args"]["scan_duration"],
)
# Test redundant staging
with pytest.raises(ophyd.utils.errors.RedundantStaging):
mock_bragg.stage()