Files
debye_bec/tests/tests_devices/test_mo1_bragg.py

560 lines
23 KiB
Python

# pylint: skip-file
import os
import threading
import time
from dataclasses import fields
from unittest import mock
import ophyd
import pytest
from bec_lib.messages import ScanQueueMessage, ScanStatusMessage
from bec_server.scan_server.scan_assembler import ScanAssembler
from bec_server.scan_server.scan_queue import RequestBlock
from bec_server.scan_server.scan_worker import ScanWorker
from bec_server.scan_server.tests.fixtures import scan_server_mock
from ophyd.utils import LimitError
from ophyd_devices.tests.utils import MockPV
# from bec_server.device_server.tests.utils import DMMock
from debye_bec.devices.mo1_bragg.mo1_bragg import (
Mo1Bragg,
Mo1BraggError,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
)
from debye_bec.devices.mo1_bragg.mo1_bragg_devices import MoveType
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def scan_worker_mock(scan_server_mock):
scan_server_mock.device_manager.connector = mock.MagicMock()
scan_worker = ScanWorker(parent=scan_server_mock)
yield scan_worker
@pytest.fixture(scope="function")
def mock_bragg():
name = "bragg"
prefix = "X01DA-OP-MO1:BRAGG:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Mo1Bragg(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_bragg):
dev = mock_bragg
assert dev.name == "bragg"
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs"
def test_check_value(mock_bragg):
dev = mock_bragg
dev.low_lim._read_pv.mock_data = 0
dev.high_lim._read_pv.mock_data = 1
# nothing happens
dev.check_value(0.5)
with pytest.raises(LimitError):
dev.check_value(15)
def test_egu(mock_bragg):
dev = mock_bragg
assert dev.egu == "eV"
def test_move_succeeds(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
dev.motor_is_moving._read_pv.mock_data = 0
status = dev.move(0.5)
assert status.done is False
dev.motor_is_moving._read_pv.mock_data = 1
status.wait(timeout=3) # Callback should within that time
assert status.done is True
assert status.success is True
assert dev.setpoint.get() == 0.5
assert dev.move_abs.get() == 1
def test_stop_move(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
dev.motor_is_moving._read_pv.mock_data = 0
# Move fails
status = dev.move(0.5)
time.sleep(1)
assert status.done is False
assert dev._stopped == False
dev.stop()
time.sleep(0.5)
assert dev._stopped == True
assert status.done is True
assert status.success is False
def test_set_xtal(mock_bragg):
dev = mock_bragg
dev.set_xtal("111")
# Default values for mock
assert dev.crystal.offset_si111.get() == 0
assert dev.crystal.offset_si311.get() == 0
assert dev.crystal.d_spacing_si111.get() == 0
assert dev.crystal.d_spacing_si311.get() == 0
assert dev.crystal.xtal_enum.get() == 0
dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4)
assert dev.crystal.offset_si111.get() == 1
assert dev.crystal.offset_si311.get() == 2
assert dev.crystal.d_spacing_si111.get() == 3
assert dev.crystal.d_spacing_si311.get() == 4
assert dev.crystal.xtal_enum.get() == 1
def test_set_xas_settings(mock_bragg):
dev = mock_bragg
dev.set_xas_settings(low=0.5, high=1, scan_time=0.1)
assert dev.scan_settings.s_scan_energy_lo.get() == 0.5
assert dev.scan_settings.s_scan_energy_hi.get() == 1
assert dev.scan_settings.s_scan_scantime.get() == 0.1
def test_set_trig_settings(mock_bragg):
dev = mock_bragg
dev.set_trig_settings(
enable_low=True,
enable_high=False,
exp_time_high=0.1,
exp_time_low=0.01,
cycle_low=1,
cycle_high=3,
)
assert dev.scan_settings.trig_ena_lo_enum.get() == True
assert dev.scan_settings.trig_ena_hi_enum.get() == False
assert dev.scan_settings.trig_every_n_lo.get() == 1
assert dev.scan_settings.trig_every_n_hi.get() == 3
assert dev.scan_settings.trig_time_lo.get() == 0.01
assert dev.scan_settings.trig_time_hi.get() == 0.1
def test_set_control_settings(mock_bragg):
dev = mock_bragg
dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE
assert dev.scan_control.scan_duration.get() == 10
dev.set_scan_control_settings(mode=ScanControlMode.ADVANCED, scan_duration=5)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.ADVANCED
assert dev.scan_control.scan_duration.get() == 5
def test_update_scan_parameters(mock_bragg):
dev = mock_bragg
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
request_inputs={
"inputs": {},
"kwargs": {
"start": 0,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
"p_kink": 50,
"e_kink": 8000,
},
},
info={
"kwargs": {
"start": 0,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
"p_kink": 50,
"e_kink": 8000,
}
},
metadata={},
)
mock_bragg.scan_info.msg = msg
scan_param = dev.scan_parameter.model_dump()
for _, v in scan_param.items():
assert v == None
dev._update_scan_parameter()
scan_param = dev.scan_parameter.model_dump()
for k, v in scan_param.items():
assert v == msg.content["request_inputs"]["kwargs"].get(k, None)
def test_kickoff_scan(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY
dev.scan_control.scan_duration._read_pv.mock_data = 1
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
assert status.done is False
# TODO MockPV does not support callbacks yet, so we need to improve here #16
# dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
# dev.scan_control.scan_status._read_pv.
# status.wait(timeout=3) # Callback should resolve now
# assert status.done is True
# # dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
# time.sleep(0.2)
# assert status.done is True
assert dev.scan_control.scan_start_timer.get() == 1
dev.scan_control.scan_duration._read_pv.mock_data = 0
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
dev.kickoff()
assert dev.scan_control.scan_start_infinite.get() == 1
# FIXME #22 once mock_pv supports callbacks, high priority!
# def test_complete(mock_bragg):
# dev = mock_bragg
# dev.scan_control.scan_done._read_pv.mock_data = 0
# # Normal case
# status = dev.complete()
# assert status.done is False
# assert status.success is False
# dev.scan_control.scan_done._read_pv.mock_data = 1
# status.wait()
# # time.sleep(0.2)
# assert status.done is True
# assert status.success is True
# # Stop called case
# dev.scan_control.scan_done._read_pv.mock_data = 0
# status = dev.complete()
# assert status.done is False
# assert status.success is False
# dev.stop()
# time.sleep(0.2)
# assert status.done is True
# assert status.success is False
# FIXME #22 once mock_pv supports callbacks, high priority!
# def test_unstage(mock_bragg):
# mock_bragg.timeout_for_pvwait = 0.5
# mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
# with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
# status = mock_bragg.unstage()
# assert mock_put.call_count == 0
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
# with pytest.raises(TimeoutError):
# mock_bragg.unstage()
# assert mock_put.call_count == 1
# TODO reimplement the test for stage method
# @pytest.mark.parametrize(
# "msg",
# [
# ScanQueueMessage(
# scan_type="monitor_scan",
# parameter={
# "args": {},
# "kwargs": {
# "device": "mo1_bragg",
# "start": 0,
# "stop": 10,
# "relative": True,
# "system_config": {"file_suffix": None, "file_directory": None},
# },
# "num_points": 100,
# },
# queue="primary",
# metadata={"RID": "test1234"},
# ),
# ScanQueueMessage(
# scan_type="xas_simple_scan",
# parameter={
# "args": {},
# "kwargs": {
# "motor": "mo1_bragg",
# "start": 0,
# "stop": 10,
# "scan_time": 1,
# "scan_duration": 10,
# "system_config": {"file_suffix": None, "file_directory": None},
# },
# "num_points": 100,
# },
# queue="primary",
# metadata={"RID": "test1234"},
# ),
# ScanQueueMessage(
# scan_type="xas_simple_scan_with_xrd",
# parameter={
# "args": {},
# "kwargs": {
# "motor": "mo1_bragg",
# "start": 0,
# "stop": 10,
# "scan_time": 1,
# "scan_duration": 10,
# "xrd_enable_low": True,
# "xrd_enable_high": False,
# "num_trigger_low": 1,
# "num_trigger_high": 7,
# "exp_time_low": 1,
# "exp_time_high": 3,
# "cycle_low": 1,
# "cycle_high": 5,
# "system_config": {"file_suffix": None, "file_directory": None},
# },
# "num_points": 10,
# },
# queue="primary",
# metadata={"RID": "test1234"},
# ),
# ScanQueueMessage(
# scan_type="xas_advanced_scan",
# parameter={
# "args": {},
# "kwargs": {
# "motor": "mo1_bragg",
# "start": 8000,
# "stop": 9000,
# "scan_time": 1,
# "scan_duration": 10,
# "p_kink": 50,
# "e_kink": 8500,
# "system_config": {"file_suffix": None, "file_directory": None},
# },
# "num_points": 100,
# },
# queue="primary",
# metadata={"RID": "test1234"},
# ),
# ScanQueueMessage(
# scan_type="xas_advanced_scan_with_xrd",
# parameter={
# "args": {},
# "kwargs": {
# "motor": "mo1_bragg",
# "start": 8000,
# "stop": 9000,
# "scan_time": 1,
# "scan_duration": 10,
# "p_kink": 50,
# "e_kink": 8500,
# "xrd_enable_low": True,
# "xrd_enable_high": False,
# "num_trigger_low": 1,
# "num_trigger_high": 7,
# "exp_time_low": 1,
# "exp_time_high": 3,
# "cycle_low": 1,
# "cycle_high": 5,
# "system_config": {"file_suffix": None, "file_directory": None},
# },
# "num_points": 10,
# },
# queue="primary",
# metadata={"RID": "test1234"},
# ),
# ],
# )
# def test_stage(mock_bragg, scan_worker_mock, msg):
# """This test is important to check that the stage method of the device is working correctly.
# Changing the kwargs names in the scans is tightly linked to the logic on the device, thus
# it is important to check that the stage method is working correctly for the current implementation.
# Therefor, this test creates a scaninfo message using the scan.open_scan() method to always check
# agains the currently implemented scans vs. the logic on the device"""
# # Create a scaninfo message using scans the ScanQueueMessages above, 3 cases of fly scan; for the general case the procedure is not defined yet
# worker = scan_worker_mock
# scan_server = worker.parent
# rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server))
# with mock.patch.object(worker, "current_instruction_queue_item"):
# worker.scan_motors = []
# worker.readout_priority = {
# "monitored": [],
# "baseline": [],
# "async": [],
# "continuous": [],
# "on_request": [],
# }
# open_scan_msg = list(rb.scan.open_scan())[0]
# worker._initialize_scan_info(
# rb, open_scan_msg, msg.content["parameter"].get("num_points", 1)
# )
# # TODO find a better solution to this...
# scan_status_msg = ScanStatusMessage(
# scan_id=worker.current_scan_id,
# status="open",
# scan_name=worker.current_scan_info.get("scan_name"),
# scan_number=worker.current_scan_info.get("scan_number"),
# session_id=worker.current_scan_info.get("session_id"),
# dataset_number=worker.current_scan_info.get("dataset_number"),
# num_points=worker.current_scan_info.get("num_points"),
# scan_type=worker.current_scan_info.get("scan_type"),
# scan_report_devices=worker.current_scan_info.get("scan_report_devices"),
# user_metadata=worker.current_scan_info.get("user_metadata"),
# readout_priority=worker.current_scan_info.get("readout_priority"),
# scan_parameters=worker.current_scan_info.get("scan_parameters"),
# request_inputs=worker.current_scan_info.get("request_inputs"),
# info=worker.current_scan_info,
# )
# mock_bragg.scan_info.msg = scan_status_msg
# # Ensure that ScanControlLoadMessage is set to SUCCESS
# mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
# with (
# mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
# mock.patch.object(mock_bragg, "on_unstage"),
# ):
# scan_name = scan_status_msg.content["info"].get("scan_name", "")
# # Chek the not implemented fly scan first, should raise Mo1BraggError
# if scan_name not in [
# "xas_simple_scan",
# "xas_simple_scan_with_xrd",
# "xas_advanced_scan",
# "xas_advanced_scan_with_xrd",
# ]:
# with pytest.raises(Mo1BraggError):
# mock_bragg.stage()
# assert mock_check_scan_msg.call_count == 1
# else:
# with (
# mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
# mock.patch.object(
# mock_bragg, "set_advanced_xas_settings"
# ) as mock_advanced_xas_settings,
# mock.patch.object(mock_bragg, "set_trig_settings") as mock_trig_settings,
# mock.patch.object(
# mock_bragg, "set_scan_control_settings"
# ) as mock_set_scan_control_settings,
# ):
# # Check xas_simple_scan
# if scan_name == "xas_simple_scan":
# mock_bragg.stage()
# assert mock_xas_settings.call_args == mock.call(
# low=scan_status_msg.content["info"]["kwargs"]["start"],
# high=scan_status_msg.content["info"]["kwargs"]["stop"],
# scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
# )
# assert mock_trig_settings.call_args == mock.call(
# enable_low=False,
# enable_high=False,
# exp_time_low=0,
# exp_time_high=0,
# cycle_low=0,
# cycle_high=0,
# )
# assert mock_set_scan_control_settings.call_args == mock.call(
# mode=ScanControlMode.SIMPLE,
# scan_duration=scan_status_msg.content["info"]["kwargs"][
# "scan_duration"
# ],
# )
# # Check xas_simple_scan_with_xrd
# elif scan_name == "xas_simple_scan_with_xrd":
# mock_bragg.stage()
# assert mock_xas_settings.call_args == mock.call(
# low=scan_status_msg.content["info"]["kwargs"]["start"],
# high=scan_status_msg.content["info"]["kwargs"]["stop"],
# scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
# )
# assert mock_trig_settings.call_args == mock.call(
# enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
# enable_high=scan_status_msg.content["info"]["kwargs"][
# "xrd_enable_high"
# ],
# exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
# exp_time_high=scan_status_msg.content["info"]["kwargs"][
# "exp_time_high"
# ],
# cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
# cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
# )
# assert mock_set_scan_control_settings.call_args == mock.call(
# mode=ScanControlMode.SIMPLE,
# scan_duration=scan_status_msg.content["info"]["kwargs"][
# "scan_duration"
# ],
# )
# # Check xas_advanced_scan
# elif scan_name == "xas_advanced_scan":
# mock_bragg.stage()
# assert mock_advanced_xas_settings.call_args == mock.call(
# low=scan_status_msg.content["info"]["kwargs"]["start"],
# high=scan_status_msg.content["info"]["kwargs"]["stop"],
# scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
# p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
# e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
# )
# assert mock_trig_settings.call_args == mock.call(
# enable_low=False,
# enable_high=False,
# exp_time_low=0,
# exp_time_high=0,
# cycle_low=0,
# cycle_high=0,
# )
# assert mock_set_scan_control_settings.call_args == mock.call(
# mode=ScanControlMode.ADVANCED,
# scan_duration=scan_status_msg.content["info"]["kwargs"][
# "scan_duration"
# ],
# )
# # Check xas_advanced_scan_with_xrd
# elif scan_name == "xas_advanced_scan_with_xrd":
# mock_bragg.stage()
# assert mock_advanced_xas_settings.call_args == mock.call(
# low=scan_status_msg.content["info"]["kwargs"]["start"],
# high=scan_status_msg.content["info"]["kwargs"]["stop"],
# scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
# p_kink=scan_status_msg.content["info"]["kwargs"]["p_kink"],
# e_kink=scan_status_msg.content["info"]["kwargs"]["e_kink"],
# )
# assert mock_trig_settings.call_args == mock.call(
# enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
# enable_high=scan_status_msg.content["info"]["kwargs"][
# "xrd_enable_high"
# ],
# exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
# exp_time_high=scan_status_msg.content["info"]["kwargs"][
# "exp_time_high"
# ],
# cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
# cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
# )
# assert mock_set_scan_control_settings.call_args == mock.call(
# mode=ScanControlMode.ADVANCED,
# scan_duration=scan_status_msg.content["info"]["kwargs"][
# "scan_duration"
# ],
# )