Files
debye_bec/tests/tests_devices/test_mo1_bragg.py

437 lines
17 KiB
Python

# pylint: skip-file
import os
import threading
import time
from dataclasses import fields
from unittest import mock
import ophyd
import pytest
from bec_lib.messages import ScanQueueMessage, ScanStatusMessage
from bec_server.scan_server.scan_assembler import ScanAssembler
from bec_server.scan_server.scan_queue import RequestBlock
from bec_server.scan_server.scan_worker import ScanWorker
from bec_server.scan_server.tests.fixtures import scan_server_mock
from ophyd.utils import LimitError
from ophyd_devices.tests.utils import MockPV
# from bec_server.device_server.tests.utils import DMMock
from debye_bec.devices.mo1_bragg import (
Mo1Bragg,
Mo1BraggError,
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
)
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def scan_worker_mock(scan_server_mock):
scan_server_mock.device_manager.connector = mock.MagicMock()
scan_worker = ScanWorker(parent=scan_server_mock)
yield scan_worker
@pytest.fixture(scope="function")
def mock_bragg():
name = "bragg"
prefix = "X01DA-OP-MO1:BRAGG:"
with (
mock.patch.object(ophyd, "cl") as mock_cl,
mock.patch("debye_bec.devices.mo1_bragg.Mo1Bragg", "_on_init"),
):
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Mo1Bragg(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_bragg):
dev = mock_bragg
assert dev.name == "bragg"
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
assert dev.move_type.get() == MoveType.ENERGY
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs.PROC"
def test_check_value(mock_bragg):
dev = mock_bragg
dev.low_lim._read_pv.mock_data = 0
dev.high_lim._read_pv.mock_data = 1
dev.low_limit_angle._read_pv.mock_data = 10
dev.high_limit_angle._read_pv.mock_data = 20
# Check that limits are taken correctly from angle or energy
# Energy first
move_type = MoveType.ENERGY
dev.move_type.set(move_type)
# nothing happens
dev.check_value(0.5)
with pytest.raises(LimitError):
dev.check_value(15)
# Angle next
move_type = MoveType.ANGLE
dev.move_type.set(move_type)
dev.check_value(15)
with pytest.raises(LimitError):
dev.check_value(0.5)
def test_egu(mock_bragg):
dev = mock_bragg
assert dev.egu == "eV"
dev.move_type.set(MoveType.ANGLE)
assert dev.egu == "deg"
def test_move_succeeds(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
# Move succeeds
with mock.patch.object(dev.motor_is_moving._read_pv, "mock_data", side_effect=[0, 1]):
status = dev.move(0.5)
# Sleep needed to allow thread to resolive in _move_and_finish, i.e. and the 0.25s sleep inside the function
time.sleep(1)
assert status.done is True
assert status.success is True
assert dev.setpoint.get() == 0.5
assert dev.move_abs.get() == 1
def test_stop_move(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
dev.motor_is_moving._read_pv.mock_data = 0
# Move fails
status = dev.move(0.5)
time.sleep(1)
assert status.done is False
assert dev._stopped == False
dev.stop()
time.sleep(0.5)
assert dev._stopped == True
assert status.done is True
assert status.success is False
def test_set_xtal(mock_bragg):
dev = mock_bragg
dev.set_xtal("111")
# Default values for mock
assert dev.crystal.offset_si111.get() == 0
assert dev.crystal.offset_si311.get() == 0
assert dev.crystal.d_spacing_si111.get() == 0
assert dev.crystal.d_spacing_si311.get() == 0
assert dev.crystal.xtal_enum.get() == 0
dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4)
assert dev.crystal.offset_si111.get() == 1
assert dev.crystal.offset_si311.get() == 2
assert dev.crystal.d_spacing_si111.get() == 3
assert dev.crystal.d_spacing_si311.get() == 4
assert dev.crystal.xtal_enum.get() == 1
def test_set_xas_settings(mock_bragg):
dev = mock_bragg
dev.move_type.set(MoveType.ENERGY)
dev.set_xas_settings(low=0.5, high=1, scan_time=0.1)
assert dev.scan_settings.s_scan_energy_lo.get() == 0.5
assert dev.scan_settings.s_scan_energy_hi.get() == 1
assert dev.scan_settings.s_scan_scantime.get() == 0.1
dev.move_type.set(MoveType.ANGLE)
dev.set_xas_settings(low=10, high=20, scan_time=1)
assert dev.scan_settings.s_scan_angle_lo.get() == 10
assert dev.scan_settings.s_scan_angle_hi.get() == 20
assert dev.scan_settings.s_scan_scantime.get() == 1
def test_set_xrd_settings(mock_bragg):
dev = mock_bragg
dev.set_xrd_settings(
enable_low=True,
enable_high=False,
num_trigger_low=1,
num_trigger_high=7,
exp_time_low=1,
exp_time_high=3,
cycle_low=1,
cycle_high=5,
)
assert dev.scan_settings.xrd_enable_lo_enum.get() == True
assert dev.scan_settings.xrd_enable_hi_enum.get() == False
assert dev.scan_settings.xrd_n_trigger_lo.get() == 1
assert dev.scan_settings.xrd_n_trigger_hi.get() == 7
assert dev.scan_settings.xrd_time_lo.get() == 1
assert dev.scan_settings.xrd_time_hi.get() == 3
assert dev.scan_settings.xrd_every_n_lo.get() == 1
assert dev.scan_settings.xrd_every_n_hi.get() == 5
def test_set_control_settings(mock_bragg):
dev = mock_bragg
dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE
assert dev.scan_control.scan_duration.get() == 10
dev.set_scan_control_settings(mode=ScanControlMode.ADVANCED, scan_duration=5)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.ADVANCED
assert dev.scan_control.scan_duration.get() == 5
def test_update_scan_parameters(mock_bragg):
dev = mock_bragg
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
info={
"kwargs": {
"start": 0,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
}
},
metadata={},
)
mock_bragg.scaninfo.scan_msg = msg
for field in fields(dev.scan_parameter):
assert getattr(dev.scan_parameter, field.name) == None
dev._update_scan_parameter()
for field in fields(dev.scan_parameter):
assert getattr(dev.scan_parameter, field.name) == msg.content["info"]["kwargs"].get(
field.name, None
)
def test_kickoff_scan(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY
dev.scan_control.scan_duration._read_pv.mock_data = 1
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
assert status.done is False
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
time.sleep(0.2)
assert status.done is True
assert dev.scan_control.scan_start_timer.get() == 1
dev.scan_control.scan_duration._read_pv.mock_data = 0
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
dev.kickoff()
assert dev.scan_control.scan_start_infinite.get() == 1
def test_complete(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_done._read_pv.mock_data = 0
# Normal case
status = dev.complete()
assert status.done is False
assert status.success is False
dev.scan_control.scan_done._read_pv.mock_data = 1
time.sleep(0.2)
assert status.done is True
assert status.success is True
# Stop called case
dev.scan_control.scan_done._read_pv.mock_data = 0
status = dev.complete()
assert status.done is False
assert status.success is False
dev.stop()
time.sleep(0.2)
assert status.done is True
assert status.success is False
def test_unstage(mock_bragg):
mock_bragg.timeout_for_pvwait = 0.5
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
mock_bragg.unstage()
assert mock_put.call_count == 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with pytest.raises(TimeoutError):
mock_bragg.unstage()
assert mock_put.call_count == 1
@pytest.mark.parametrize(
"msg",
[
ScanQueueMessage(
scan_type="monitor_scan",
parameter={
"args": {},
"kwargs": {"device": "mo1_bragg", "start": 0, "stop": 10, "relative": True},
"num_points": 100,
},
queue="primary",
metadata={"RID": "test1234"},
),
ScanQueueMessage(
scan_type="xas_simple_scan",
parameter={
"args": {},
"kwargs": {
"motor": "mo1_bragg",
"start": 0,
"stop": 10,
"scan_time": 1,
"scan_duration": 10,
},
"num_points": 100,
},
queue="primary",
metadata={"RID": "test1234"},
),
ScanQueueMessage(
scan_type="xas_simple_scan_with_xrd",
parameter={
"args": {},
"kwargs": {
"motor": "mo1_bragg",
"start": 0,
"stop": 10,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
},
"num_points": 10,
},
queue="primary",
metadata={"RID": "test1234"},
),
],
)
def test_stage(mock_bragg, scan_worker_mock, msg):
"""This test is important to check that the stage method of the device is working correctly.
Changing the kwargs names in the scans is tightly linked to the logic on the device, thus
it is important to check that the stage method is working correctly for the current implementation.
Therefor, this test creates a scaninfo message using the scan.open_scan() method to always check
agains the currently implemented scans vs. the logic on the device"""
# Create a scaninfo message using scans the ScanQueueMessages above, 3 cases of fly scan; for the general case the procedure is not defined yet
worker = scan_worker_mock
scan_server = worker.parent
rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server))
with mock.patch.object(worker, "current_instruction_queue_item"):
worker.scan_motors = []
worker.readout_priority = {
"monitored": [],
"baseline": [],
"async": [],
"continuous": [],
"on_request": [],
}
open_scan_msg = list(rb.scan.open_scan())[0]
worker._initialize_scan_info(rb, open_scan_msg, msg.content["parameter"].get("num_points"))
scan_status_msg = ScanStatusMessage(
scan_id="test1234", status="closed", info=worker.current_scan_info, metadata={}
)
mock_bragg.scaninfo.scan_msg = scan_status_msg
# Ensure that ScanControlLoadMessage is set to SUCCESS
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with (
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
mock.patch.object(mock_bragg, "on_unstage"),
):
scan_name = scan_status_msg.content["info"].get("scan_name", "")
# Chek the not implemented fly scan first, should raise Mo1BraggError
if scan_name not in ["xas_simple_scan", "xas_simple_scan_with_xrd"]:
with pytest.raises(Mo1BraggError):
mock_bragg.stage()
assert mock_check_scan_msg.call_count == 1
assert mock_load_scan_metadata.call_count == 1
else:
with (
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
):
# Check xas_simple_scan
if scan_name == "xas_simple_scan":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_simple_scan_with_xrd
elif scan_name == "xas_simple_scan_with_xrd":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
num_trigger_low=scan_status_msg.content["info"]["kwargs"][
"num_trigger_low"
],
num_trigger_high=scan_status_msg.content["info"]["kwargs"][
"num_trigger_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
],
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)