437 lines
17 KiB
Python
437 lines
17 KiB
Python
# pylint: skip-file
|
|
import os
|
|
import threading
|
|
import time
|
|
from dataclasses import fields
|
|
from unittest import mock
|
|
|
|
import ophyd
|
|
import pytest
|
|
from bec_lib.messages import ScanQueueMessage, ScanStatusMessage
|
|
from bec_server.scan_server.scan_assembler import ScanAssembler
|
|
from bec_server.scan_server.scan_queue import RequestBlock
|
|
from bec_server.scan_server.scan_worker import ScanWorker
|
|
from bec_server.scan_server.tests.fixtures import scan_server_mock
|
|
from ophyd.utils import LimitError
|
|
from ophyd_devices.tests.utils import MockPV
|
|
|
|
# from bec_server.device_server.tests.utils import DMMock
|
|
from debye_bec.devices.mo1_bragg import (
|
|
Mo1Bragg,
|
|
Mo1BraggError,
|
|
MoveType,
|
|
ScanControlLoadMessage,
|
|
ScanControlMode,
|
|
ScanControlScanStatus,
|
|
)
|
|
|
|
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
|
|
from debye_bec.devices.test_utils.utils import patch_dual_pvs
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def scan_worker_mock(scan_server_mock):
|
|
scan_server_mock.device_manager.connector = mock.MagicMock()
|
|
scan_worker = ScanWorker(parent=scan_server_mock)
|
|
yield scan_worker
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_bragg():
|
|
name = "bragg"
|
|
prefix = "X01DA-OP-MO1:BRAGG:"
|
|
with (
|
|
mock.patch.object(ophyd, "cl") as mock_cl,
|
|
mock.patch("debye_bec.devices.mo1_bragg.Mo1Bragg", "_on_init"),
|
|
):
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading.Thread
|
|
dev = Mo1Bragg(name=name, prefix=prefix)
|
|
patch_dual_pvs(dev)
|
|
yield dev
|
|
|
|
|
|
def test_init(mock_bragg):
|
|
dev = mock_bragg
|
|
assert dev.name == "bragg"
|
|
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
|
|
assert dev.move_type.get() == MoveType.ENERGY
|
|
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
|
|
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs.PROC"
|
|
|
|
|
|
def test_check_value(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.low_lim._read_pv.mock_data = 0
|
|
dev.high_lim._read_pv.mock_data = 1
|
|
dev.low_limit_angle._read_pv.mock_data = 10
|
|
dev.high_limit_angle._read_pv.mock_data = 20
|
|
# Check that limits are taken correctly from angle or energy
|
|
# Energy first
|
|
move_type = MoveType.ENERGY
|
|
dev.move_type.set(move_type)
|
|
# nothing happens
|
|
dev.check_value(0.5)
|
|
with pytest.raises(LimitError):
|
|
dev.check_value(15)
|
|
# Angle next
|
|
move_type = MoveType.ANGLE
|
|
dev.move_type.set(move_type)
|
|
dev.check_value(15)
|
|
with pytest.raises(LimitError):
|
|
dev.check_value(0.5)
|
|
|
|
|
|
def test_egu(mock_bragg):
|
|
dev = mock_bragg
|
|
assert dev.egu == "eV"
|
|
dev.move_type.set(MoveType.ANGLE)
|
|
assert dev.egu == "deg"
|
|
|
|
|
|
def test_move_succeeds(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.move_abs._read_pv.mock_data = 0
|
|
# Move succeeds
|
|
with mock.patch.object(dev.motor_is_moving._read_pv, "mock_data", side_effect=[0, 1]):
|
|
status = dev.move(0.5)
|
|
# Sleep needed to allow thread to resolive in _move_and_finish, i.e. and the 0.25s sleep inside the function
|
|
time.sleep(1)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert dev.setpoint.get() == 0.5
|
|
assert dev.move_abs.get() == 1
|
|
|
|
|
|
def test_stop_move(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.move_abs._read_pv.mock_data = 0
|
|
dev.motor_is_moving._read_pv.mock_data = 0
|
|
# Move fails
|
|
status = dev.move(0.5)
|
|
time.sleep(1)
|
|
assert status.done is False
|
|
assert dev._stopped == False
|
|
dev.stop()
|
|
time.sleep(0.5)
|
|
assert dev._stopped == True
|
|
assert status.done is True
|
|
assert status.success is False
|
|
|
|
|
|
def test_set_xtal(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.set_xtal("111")
|
|
# Default values for mock
|
|
assert dev.crystal.offset_si111.get() == 0
|
|
assert dev.crystal.offset_si311.get() == 0
|
|
assert dev.crystal.d_spacing_si111.get() == 0
|
|
assert dev.crystal.d_spacing_si311.get() == 0
|
|
assert dev.crystal.xtal_enum.get() == 0
|
|
dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4)
|
|
assert dev.crystal.offset_si111.get() == 1
|
|
assert dev.crystal.offset_si311.get() == 2
|
|
assert dev.crystal.d_spacing_si111.get() == 3
|
|
assert dev.crystal.d_spacing_si311.get() == 4
|
|
assert dev.crystal.xtal_enum.get() == 1
|
|
|
|
|
|
def test_set_xas_settings(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.move_type.set(MoveType.ENERGY)
|
|
dev.set_xas_settings(low=0.5, high=1, scan_time=0.1)
|
|
assert dev.scan_settings.s_scan_energy_lo.get() == 0.5
|
|
assert dev.scan_settings.s_scan_energy_hi.get() == 1
|
|
assert dev.scan_settings.s_scan_scantime.get() == 0.1
|
|
dev.move_type.set(MoveType.ANGLE)
|
|
dev.set_xas_settings(low=10, high=20, scan_time=1)
|
|
assert dev.scan_settings.s_scan_angle_lo.get() == 10
|
|
assert dev.scan_settings.s_scan_angle_hi.get() == 20
|
|
assert dev.scan_settings.s_scan_scantime.get() == 1
|
|
|
|
|
|
def test_set_xrd_settings(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.set_xrd_settings(
|
|
enable_low=True,
|
|
enable_high=False,
|
|
num_trigger_low=1,
|
|
num_trigger_high=7,
|
|
exp_time_low=1,
|
|
exp_time_high=3,
|
|
cycle_low=1,
|
|
cycle_high=5,
|
|
)
|
|
assert dev.scan_settings.xrd_enable_lo_enum.get() == True
|
|
assert dev.scan_settings.xrd_enable_hi_enum.get() == False
|
|
assert dev.scan_settings.xrd_n_trigger_lo.get() == 1
|
|
assert dev.scan_settings.xrd_n_trigger_hi.get() == 7
|
|
assert dev.scan_settings.xrd_time_lo.get() == 1
|
|
assert dev.scan_settings.xrd_time_hi.get() == 3
|
|
assert dev.scan_settings.xrd_every_n_lo.get() == 1
|
|
assert dev.scan_settings.xrd_every_n_hi.get() == 5
|
|
|
|
|
|
def test_set_control_settings(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10)
|
|
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE
|
|
assert dev.scan_control.scan_duration.get() == 10
|
|
dev.set_scan_control_settings(mode=ScanControlMode.ADVANCED, scan_duration=5)
|
|
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.ADVANCED
|
|
assert dev.scan_control.scan_duration.get() == 5
|
|
|
|
|
|
def test_update_scan_parameters(mock_bragg):
|
|
dev = mock_bragg
|
|
msg = ScanStatusMessage(
|
|
scan_id="my_scan_id",
|
|
status="closed",
|
|
info={
|
|
"kwargs": {
|
|
"start": 0,
|
|
"stop": 5,
|
|
"scan_time": 1,
|
|
"scan_duration": 10,
|
|
"xrd_enable_low": True,
|
|
"xrd_enable_high": False,
|
|
"num_trigger_low": 1,
|
|
"num_trigger_high": 7,
|
|
"exp_time_low": 1,
|
|
"exp_time_high": 3,
|
|
"cycle_low": 1,
|
|
"cycle_high": 5,
|
|
}
|
|
},
|
|
metadata={},
|
|
)
|
|
mock_bragg.scaninfo.scan_msg = msg
|
|
for field in fields(dev.scan_parameter):
|
|
assert getattr(dev.scan_parameter, field.name) == None
|
|
dev._update_scan_parameter()
|
|
for field in fields(dev.scan_parameter):
|
|
assert getattr(dev.scan_parameter, field.name) == msg.content["info"]["kwargs"].get(
|
|
field.name, None
|
|
)
|
|
|
|
|
|
def test_kickoff_scan(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY
|
|
dev.scan_control.scan_duration._read_pv.mock_data = 1
|
|
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
|
|
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
|
|
status = dev.kickoff()
|
|
assert status.done is False
|
|
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
|
|
time.sleep(0.2)
|
|
assert status.done is True
|
|
assert dev.scan_control.scan_start_timer.get() == 1
|
|
|
|
dev.scan_control.scan_duration._read_pv.mock_data = 0
|
|
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
|
|
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
|
|
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
|
|
status = dev.kickoff()
|
|
dev.kickoff()
|
|
assert dev.scan_control.scan_start_infinite.get() == 1
|
|
|
|
|
|
def test_complete(mock_bragg):
|
|
dev = mock_bragg
|
|
dev.scan_control.scan_done._read_pv.mock_data = 0
|
|
# Normal case
|
|
status = dev.complete()
|
|
assert status.done is False
|
|
assert status.success is False
|
|
dev.scan_control.scan_done._read_pv.mock_data = 1
|
|
time.sleep(0.2)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
|
|
# Stop called case
|
|
dev.scan_control.scan_done._read_pv.mock_data = 0
|
|
status = dev.complete()
|
|
assert status.done is False
|
|
assert status.success is False
|
|
dev.stop()
|
|
time.sleep(0.2)
|
|
assert status.done is True
|
|
assert status.success is False
|
|
|
|
|
|
def test_unstage(mock_bragg):
|
|
mock_bragg.timeout_for_pvwait = 0.5
|
|
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
|
|
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
|
|
|
|
with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
|
|
mock_bragg.unstage()
|
|
assert mock_put.call_count == 0
|
|
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
|
|
with pytest.raises(TimeoutError):
|
|
mock_bragg.unstage()
|
|
assert mock_put.call_count == 1
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"msg",
|
|
[
|
|
ScanQueueMessage(
|
|
scan_type="monitor_scan",
|
|
parameter={
|
|
"args": {},
|
|
"kwargs": {"device": "mo1_bragg", "start": 0, "stop": 10, "relative": True},
|
|
"num_points": 100,
|
|
},
|
|
queue="primary",
|
|
metadata={"RID": "test1234"},
|
|
),
|
|
ScanQueueMessage(
|
|
scan_type="xas_simple_scan",
|
|
parameter={
|
|
"args": {},
|
|
"kwargs": {
|
|
"motor": "mo1_bragg",
|
|
"start": 0,
|
|
"stop": 10,
|
|
"scan_time": 1,
|
|
"scan_duration": 10,
|
|
},
|
|
"num_points": 100,
|
|
},
|
|
queue="primary",
|
|
metadata={"RID": "test1234"},
|
|
),
|
|
ScanQueueMessage(
|
|
scan_type="xas_simple_scan_with_xrd",
|
|
parameter={
|
|
"args": {},
|
|
"kwargs": {
|
|
"motor": "mo1_bragg",
|
|
"start": 0,
|
|
"stop": 10,
|
|
"scan_time": 1,
|
|
"scan_duration": 10,
|
|
"xrd_enable_low": True,
|
|
"xrd_enable_high": False,
|
|
"num_trigger_low": 1,
|
|
"num_trigger_high": 7,
|
|
"exp_time_low": 1,
|
|
"exp_time_high": 3,
|
|
"cycle_low": 1,
|
|
"cycle_high": 5,
|
|
},
|
|
"num_points": 10,
|
|
},
|
|
queue="primary",
|
|
metadata={"RID": "test1234"},
|
|
),
|
|
],
|
|
)
|
|
def test_stage(mock_bragg, scan_worker_mock, msg):
|
|
"""This test is important to check that the stage method of the device is working correctly.
|
|
Changing the kwargs names in the scans is tightly linked to the logic on the device, thus
|
|
it is important to check that the stage method is working correctly for the current implementation.
|
|
|
|
Therefor, this test creates a scaninfo message using the scan.open_scan() method to always check
|
|
agains the currently implemented scans vs. the logic on the device"""
|
|
# Create a scaninfo message using scans the ScanQueueMessages above, 3 cases of fly scan; for the general case the procedure is not defined yet
|
|
worker = scan_worker_mock
|
|
scan_server = worker.parent
|
|
rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server))
|
|
with mock.patch.object(worker, "current_instruction_queue_item"):
|
|
worker.scan_motors = []
|
|
worker.readout_priority = {
|
|
"monitored": [],
|
|
"baseline": [],
|
|
"async": [],
|
|
"continuous": [],
|
|
"on_request": [],
|
|
}
|
|
open_scan_msg = list(rb.scan.open_scan())[0]
|
|
worker._initialize_scan_info(rb, open_scan_msg, msg.content["parameter"].get("num_points"))
|
|
scan_status_msg = ScanStatusMessage(
|
|
scan_id="test1234", status="closed", info=worker.current_scan_info, metadata={}
|
|
)
|
|
mock_bragg.scaninfo.scan_msg = scan_status_msg
|
|
|
|
# Ensure that ScanControlLoadMessage is set to SUCCESS
|
|
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
|
|
with (
|
|
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
|
|
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
|
|
mock.patch.object(mock_bragg, "on_unstage"),
|
|
):
|
|
scan_name = scan_status_msg.content["info"].get("scan_name", "")
|
|
# Chek the not implemented fly scan first, should raise Mo1BraggError
|
|
if scan_name not in ["xas_simple_scan", "xas_simple_scan_with_xrd"]:
|
|
with pytest.raises(Mo1BraggError):
|
|
mock_bragg.stage()
|
|
assert mock_check_scan_msg.call_count == 1
|
|
assert mock_load_scan_metadata.call_count == 1
|
|
else:
|
|
with (
|
|
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
|
|
mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings,
|
|
mock.patch.object(
|
|
mock_bragg, "set_scan_control_settings"
|
|
) as mock_set_scan_control_settings,
|
|
):
|
|
# Check xas_simple_scan
|
|
if scan_name == "xas_simple_scan":
|
|
mock_bragg.stage()
|
|
assert mock_xas_settings.call_args == mock.call(
|
|
low=scan_status_msg.content["info"]["kwargs"]["start"],
|
|
high=scan_status_msg.content["info"]["kwargs"]["stop"],
|
|
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
|
|
)
|
|
assert mock_xrd_settings.call_args == mock.call(
|
|
enable_low=False,
|
|
enable_high=False,
|
|
num_trigger_low=0,
|
|
num_trigger_high=0,
|
|
exp_time_low=0,
|
|
exp_time_high=0,
|
|
cycle_low=0,
|
|
cycle_high=0,
|
|
)
|
|
assert mock_set_scan_control_settings.call_args == mock.call(
|
|
mode=ScanControlMode.SIMPLE,
|
|
scan_duration=scan_status_msg.content["info"]["kwargs"][
|
|
"scan_duration"
|
|
],
|
|
)
|
|
# Check xas_simple_scan_with_xrd
|
|
elif scan_name == "xas_simple_scan_with_xrd":
|
|
mock_bragg.stage()
|
|
assert mock_xas_settings.call_args == mock.call(
|
|
low=scan_status_msg.content["info"]["kwargs"]["start"],
|
|
high=scan_status_msg.content["info"]["kwargs"]["stop"],
|
|
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
|
|
)
|
|
assert mock_xrd_settings.call_args == mock.call(
|
|
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
|
|
enable_high=scan_status_msg.content["info"]["kwargs"][
|
|
"xrd_enable_high"
|
|
],
|
|
num_trigger_low=scan_status_msg.content["info"]["kwargs"][
|
|
"num_trigger_low"
|
|
],
|
|
num_trigger_high=scan_status_msg.content["info"]["kwargs"][
|
|
"num_trigger_high"
|
|
],
|
|
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
|
|
exp_time_high=scan_status_msg.content["info"]["kwargs"][
|
|
"exp_time_high"
|
|
],
|
|
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
|
|
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
|
|
)
|
|
assert mock_set_scan_control_settings.call_args == mock.call(
|
|
mode=ScanControlMode.SIMPLE,
|
|
scan_duration=scan_status_msg.content["info"]["kwargs"][
|
|
"scan_duration"
|
|
],
|
|
)
|