480 lines
17 KiB
Python
480 lines
17 KiB
Python
# pylint: skip-file
|
|
import threading
|
|
from unittest import mock
|
|
|
|
import numpy as np
|
|
import ophyd
|
|
import pytest
|
|
from bec_lib import messages
|
|
from bec_lib.endpoints import MessageEndpoints
|
|
from bec_server.device_server.tests.utils import DMMock
|
|
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
|
|
|
from csaxs_bec.devices.epics.mcs_card.mcs_card import (
|
|
ACQUIREMODE,
|
|
ACQUIRING,
|
|
CHANNEL1SOURCE,
|
|
CHANNELADVANCE,
|
|
INPUTMODE,
|
|
OUTPUTMODE,
|
|
POLARITY,
|
|
READMODE,
|
|
MCSCard,
|
|
)
|
|
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import READYTOREAD, MCSCardCSAXS
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_mcs_card():
|
|
"""Fixture to mock the MCSCard device."""
|
|
name = "mcs_card"
|
|
prefix = "X12SA-MCS:"
|
|
with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading
|
|
mcs_card = MCSCard(name=name, prefix=prefix)
|
|
patch_dual_pvs(mcs_card)
|
|
yield mcs_card
|
|
|
|
|
|
def test_mcs_card(mock_mcs_card):
|
|
"""Test the MCSCard initialization."""
|
|
assert mock_mcs_card.name == "mcs_card"
|
|
assert mock_mcs_card.prefix == "X12SA-MCS:"
|
|
assert len(mock_mcs_card.counters.component_names) == 32
|
|
assert mock_mcs_card.counters.mca1.name == "mcs_card_counters_mca1"
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_mcs_csaxs():
|
|
"""Fixture to mock the MCSCardCSAXS device."""
|
|
name = "mcs_csaxs"
|
|
prefix = "X12SA-MCS-CSAXS:"
|
|
dm = DMMock()
|
|
with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading.Thread
|
|
mcs_card_csaxs = MCSCardCSAXS(name=name, prefix=prefix, device_manager=dm)
|
|
patch_dual_pvs(mcs_card_csaxs)
|
|
yield mcs_card_csaxs
|
|
|
|
|
|
def test_mcs_card_csaxs(mock_mcs_csaxs):
|
|
"""Test the MCSCardCSAXS initialization."""
|
|
assert mock_mcs_csaxs.name == "mcs_csaxs"
|
|
assert mock_mcs_csaxs.prefix == "X12SA-MCS-CSAXS:"
|
|
assert mock_mcs_csaxs.counter_mapping == {
|
|
"mcs_csaxs_counters_mca1": "current1",
|
|
"mcs_csaxs_counters_mca2": "current2",
|
|
"mcs_csaxs_counters_mca3": "current3",
|
|
"mcs_csaxs_counters_mca4": "current4",
|
|
"mcs_csaxs_counters_mca5": "count_time",
|
|
}
|
|
assert mock_mcs_csaxs._mcs_clock == 1e7 # 10 MHz
|
|
|
|
|
|
def test_mcs_card_csaxs_on_connected(mock_mcs_csaxs):
|
|
"""Test the on_connected method of MCSCardCSAXS."""
|
|
mcs = mock_mcs_csaxs
|
|
mcs.on_connected()
|
|
# Stop called
|
|
assert mcs.stop_all.get() == 1
|
|
# Channel advance settings
|
|
assert mcs.channel_advance.get() == CHANNELADVANCE.EXTERNAL
|
|
assert mcs.channel1_source.get() == CHANNEL1SOURCE.EXTERNAL
|
|
assert mcs.prescale.get() == 1
|
|
#
|
|
assert mcs.user_led.get() == 0
|
|
# Only 5 channels are connected
|
|
assert mcs.mux_output.get() == 5
|
|
# input output settings
|
|
assert mcs.input_mode.get() == INPUTMODE.MODE_3
|
|
assert mcs.input_polarity.get() == POLARITY.NORMAL
|
|
assert mcs.output_mode.get() == OUTPUTMODE.MODE_2
|
|
assert mcs.output_polarity.get() == POLARITY.NORMAL
|
|
assert mcs.count_on_start.get() == 0
|
|
assert mcs.read_mode.get() == READMODE.PASSIVE
|
|
assert mcs.acquire_mode.get() == ACQUIREMODE.MCS
|
|
|
|
with mock.patch.object(mcs.counters.mca1, "subscribe") as mock_mca_subscribe:
|
|
mcs.on_connected()
|
|
assert mock_mca_subscribe.call_args == mock.call(mcs._on_counter_update, run=False)
|
|
|
|
|
|
def test_mcs_card_csaxs_stage(mock_mcs_csaxs):
|
|
"""Test on stage method of MCSCardCSAXS"""
|
|
mcs = mock_mcs_csaxs
|
|
triggers = 5
|
|
mcs.scan_info.msg.scan_parameters["frames_per_trigger"] = triggers
|
|
mcs.erase_all.put(0)
|
|
mcs.stage()
|
|
assert mcs._staged == ophyd.Staged.yes
|
|
assert mcs.erase_all.get() == 1
|
|
assert mcs.preset_real.get() == 0
|
|
assert mcs.num_use_all.get() == triggers
|
|
|
|
|
|
def test_mcs_card_csaxs_unstage(mock_mcs_csaxs):
|
|
"""Test unstage method of MCSCardCSAXS"""
|
|
mcs = mock_mcs_csaxs
|
|
mcs.stop_all.put(0)
|
|
mcs.ready_to_read.put(0)
|
|
mcs.erase_all.put(1)
|
|
mcs.unstage()
|
|
assert mcs.stop_all.get() == 1
|
|
assert mcs.ready_to_read.get() == READYTOREAD.DONE
|
|
assert mcs.erase_all.get() == 0
|
|
|
|
|
|
def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs):
|
|
"""Test complete method of MCSCarcCSAXS"""
|
|
mcs = mock_mcs_csaxs
|
|
mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING
|
|
st = mcs.complete()
|
|
assert st.done is False
|
|
mcs.stop_all.put(0)
|
|
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
|
|
mcs.stop()
|
|
with pytest.raises(Exception):
|
|
st.wait(timeout=3)
|
|
assert st.done is True
|
|
assert st.success is False
|
|
assert mcs.stop_all.get() == 1
|
|
assert mcs.ready_to_read.get() == READYTOREAD.DONE
|
|
|
|
|
|
def test_mcs_card_csaxs_on_counter_updated(mock_mcs_csaxs):
|
|
mcs = mock_mcs_csaxs
|
|
# Called for mca1
|
|
kwargs = {"obj": mcs.counters.mca1}
|
|
mcs._on_counter_update(1, **kwargs)
|
|
assert mcs.mcs.mca1.get() == 1
|
|
assert mcs.bpm.current1.get() == 1
|
|
assert mcs.counter_updated == [mcs.counters.mca1.name]
|
|
# Called for mca2
|
|
kwargs = {"obj": mcs.counters.mca2}
|
|
mcs._on_counter_update(np.array([2, 4]), **kwargs)
|
|
assert mcs.mcs.mca2.get() == [2, 4]
|
|
assert np.isclose(mcs.bpm.current2.get(), 3)
|
|
assert mcs.counter_updated == [mcs.counters.mca1.name, mcs.counters.mca2.name]
|
|
# Called for mca3
|
|
kwargs = {"obj": mcs.counters.mca3}
|
|
mcs._on_counter_update(1000, **kwargs)
|
|
assert mcs.mcs.mca3.get() == 1000
|
|
assert mcs.bpm.current3.get() == 1000
|
|
assert mcs.counter_updated == [
|
|
mcs.counters.mca1.name,
|
|
mcs.counters.mca2.name,
|
|
mcs.counters.mca3.name,
|
|
]
|
|
# Called for mca4
|
|
kwargs = {"obj": mcs.counters.mca4}
|
|
mcs._on_counter_update(np.array([20, 40]), **kwargs)
|
|
assert mcs.mcs.mca4.get() == [20, 40]
|
|
assert np.isclose(mcs.bpm.current4.get(), 30)
|
|
assert mcs.counter_updated == [
|
|
mcs.counters.mca1.name,
|
|
mcs.counters.mca2.name,
|
|
mcs.counters.mca3.name,
|
|
mcs.counters.mca4.name,
|
|
]
|
|
# Called for mca5
|
|
assert mcs.ready_to_read.get() == 0
|
|
kwargs = {"obj": mcs.counters.mca5}
|
|
mcs._on_counter_update(np.array([10000, 10000]), **kwargs)
|
|
assert np.isclose(mcs.bpm.count_time.get(), 10000 / 1e7)
|
|
assert mcs.mcs.mca5.get() == [10000, 10000]
|
|
|
|
|
|
# @pytest.fixture(scope="function")
|
|
# def mock_det():
|
|
# name = "mcs"
|
|
# prefix = "X12SA-MCS:"
|
|
# dm = DMMock()
|
|
# with mock.patch.object(dm, "connector"):
|
|
# with (
|
|
# mock.patch(
|
|
# "ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"
|
|
# ) as filemixin,
|
|
# mock.patch(
|
|
# "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
|
# ) as mock_service_config,
|
|
# ):
|
|
# with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
# mock_cl.get_pv = MockPV
|
|
# mock_cl.thread_class = threading.Thread
|
|
# with mock.patch.object(MCScSAXS, "_init"):
|
|
# det = MCScSAXS(name=name, prefix=prefix, device_manager=dm)
|
|
# patch_dual_pvs(det)
|
|
# det.TIMEOUT_FOR_SIGNALS = 0.1
|
|
# yield det
|
|
|
|
|
|
# def test_init():
|
|
# """Test the _init function:"""
|
|
# name = "eiger"
|
|
# prefix = "X12SA-ES-EIGER9M:"
|
|
# dm = DMMock()
|
|
# with mock.patch.object(dm, "connector"):
|
|
# with (
|
|
# mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"),
|
|
# mock.patch(
|
|
# "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
|
# ),
|
|
# ):
|
|
# with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
# mock_cl.get_pv = MockPV
|
|
# with (
|
|
# mock.patch(
|
|
# "csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector"
|
|
# ) as mock_init_det,
|
|
# mock.patch(
|
|
# "csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector_backend"
|
|
# ) as mock_init_backend,
|
|
# ):
|
|
# MCScSAXS(name=name, prefix=prefix, device_manager=dm)
|
|
# mock_init_det.assert_called_once()
|
|
# mock_init_backend.assert_called_once()
|
|
|
|
|
|
# @pytest.mark.parametrize(
|
|
# "trigger_source, channel_advance, channel_source1, pv_channels",
|
|
# [
|
|
# (
|
|
# 3,
|
|
# 1,
|
|
# 0,
|
|
# {
|
|
# "user_led": 0,
|
|
# "mux_output": 5,
|
|
# "input_pol": 0,
|
|
# "output_pol": 1,
|
|
# "count_on_start": 0,
|
|
# "stop_all": 1,
|
|
# },
|
|
# )
|
|
# ],
|
|
# )
|
|
# def test_initialize_detector(
|
|
# mock_det, trigger_source, channel_advance, channel_source1, pv_channels
|
|
# ):
|
|
# """Test the _init function:
|
|
|
|
# This includes testing the functions:
|
|
# - initialize_detector
|
|
# - stop_det
|
|
# - parent.set_trigger
|
|
# --> Testing the filewriter is done in test_init_filewriter
|
|
|
|
# Validation upon setting the correct PVs
|
|
|
|
# """
|
|
# mock_det.custom_prepare.initialize_detector() # call the method you want to test
|
|
# assert mock_det.channel_advance.get() == channel_advance
|
|
# assert mock_det.channel1_source.get() == channel_source1
|
|
# assert mock_det.user_led.get() == pv_channels["user_led"]
|
|
# assert mock_det.mux_output.get() == pv_channels["mux_output"]
|
|
# assert mock_det.input_polarity.get() == pv_channels["input_pol"]
|
|
# assert mock_det.output_polarity.get() == pv_channels["output_pol"]
|
|
# assert mock_det.count_on_start.get() == pv_channels["count_on_start"]
|
|
# assert mock_det.input_mode.get() == trigger_source
|
|
|
|
|
|
# def test_trigger(mock_det):
|
|
# """Test the trigger function:
|
|
# Validate that trigger calls the custom_prepare.on_trigger() function
|
|
# """
|
|
# with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger:
|
|
# mock_det.trigger()
|
|
# mock_on_trigger.assert_called_once()
|
|
|
|
|
|
# @pytest.mark.parametrize(
|
|
# "value, num_lines, num_points, done", [(100, 5, 500, False), (500, 5, 500, True)]
|
|
# )
|
|
# def test_progress_update(mock_det, value, num_lines, num_points, done):
|
|
# mock_det.num_lines.set(num_lines)
|
|
# mock_det.scaninfo.num_points = num_points
|
|
# calls = mock.call(sub_type="progress", value=value, max_value=num_points, done=done)
|
|
# with mock.patch.object(mock_det, "_run_subs") as mock_run_subs:
|
|
# mock_det.custom_prepare._progress_update(value=value)
|
|
# mock_run_subs.assert_called_once()
|
|
# assert mock_run_subs.call_args == calls
|
|
|
|
|
|
# @pytest.mark.parametrize(
|
|
# "values, expected_nothing",
|
|
# [([[100, 120, 140], [200, 220, 240], [300, 320, 340]], False), ([100, 200, 300], True)],
|
|
# )
|
|
# def test_on_mca_data(mock_det, values, expected_nothing):
|
|
# """Test the on_mca_data function:
|
|
# Validate that on_mca_data calls the custom_prepare.on_mca_data() function
|
|
# """
|
|
# with mock.patch.object(mock_det.custom_prepare, "_send_data_to_bec") as mock_send_data:
|
|
# mock_object = mock.MagicMock()
|
|
# for ii, name in enumerate(mock_det.custom_prepare.mca_names):
|
|
# mock_object.attr_name = name
|
|
# mock_det.custom_prepare._on_mca_data(obj=mock_object, value=values[ii])
|
|
# if not expected_nothing and ii < (len(values) - 1):
|
|
# assert mock_det.custom_prepare.mca_data[name] == values[ii]
|
|
|
|
# if not expected_nothing:
|
|
# mock_send_data.assert_called_once()
|
|
# assert mock_det.custom_prepare.acquisition_done is True
|
|
|
|
|
|
# @pytest.mark.parametrize(
|
|
# "metadata, mca_data",
|
|
# [
|
|
# (
|
|
# {"scan_id": 123},
|
|
# {
|
|
# "mca1": {"value": [100, 120, 140]},
|
|
# "mca3": {"value": [200, 220, 240]},
|
|
# "mca4": {"value": [300, 320, 340]},
|
|
# },
|
|
# )
|
|
# ],
|
|
# )
|
|
# def test_send_data_to_bec(mock_det, metadata, mca_data):
|
|
# mock_det.scaninfo.scan_msg = mock.MagicMock()
|
|
# mock_det.scaninfo.scan_msg.metadata = metadata
|
|
# mock_det.scaninfo.scan_id = metadata["scan_id"]
|
|
# mock_det.custom_prepare.mca_data = mca_data
|
|
# mock_det.custom_prepare._send_data_to_bec()
|
|
# device_metadata = mock_det.scaninfo.scan_msg.metadata
|
|
# metadata.update({"async_update": "append", "num_lines": mock_det.num_lines.get()})
|
|
# data = messages.DeviceMessage(signals=dict(mca_data), metadata=device_metadata)
|
|
# calls = mock.call(
|
|
# topic=MessageEndpoints.device_async_readback(
|
|
# scan_id=metadata["scan_id"], device=mock_det.name
|
|
# ),
|
|
# msg={"data": data},
|
|
# expire=1800,
|
|
# )
|
|
|
|
# assert mock_det.connector.xadd.call_args == calls
|
|
|
|
|
|
# @pytest.mark.parametrize(
|
|
# "scaninfo, triggersource, stopped, expected_exception",
|
|
# [
|
|
# (
|
|
# {"num_points": 500, "frames_per_trigger": 1, "scan_type": "step"},
|
|
# TriggerSource.MODE3,
|
|
# False,
|
|
# False,
|
|
# ),
|
|
# (
|
|
# {"num_points": 500, "frames_per_trigger": 1, "scan_type": "fly"},
|
|
# TriggerSource.MODE3,
|
|
# False,
|
|
# False,
|
|
# ),
|
|
# (
|
|
# {"num_points": 5001, "frames_per_trigger": 2, "scan_type": "step"},
|
|
# TriggerSource.MODE3,
|
|
# False,
|
|
# True,
|
|
# ),
|
|
# (
|
|
# {"num_points": 500, "frames_per_trigger": 2, "scan_type": "random"},
|
|
# TriggerSource.MODE3,
|
|
# False,
|
|
# True,
|
|
# ),
|
|
# ],
|
|
# )
|
|
# def test_stage(mock_det, scaninfo, triggersource, stopped, expected_exception):
|
|
# mock_det.scaninfo.num_points = scaninfo["num_points"]
|
|
# mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"]
|
|
# mock_det.scaninfo.scan_type = scaninfo["scan_type"]
|
|
# mock_det.stopped = stopped
|
|
# with mock.patch.object(mock_det.custom_prepare, "prepare_detector_backend") as mock_prep_fw:
|
|
# if expected_exception:
|
|
# with pytest.raises(MCSError):
|
|
# mock_det.stage()
|
|
# mock_prep_fw.assert_called_once()
|
|
# else:
|
|
# mock_det.stage()
|
|
# mock_prep_fw.assert_called_once()
|
|
# # Check set_trigger
|
|
# mock_det.input_mode.get() == triggersource
|
|
# if scaninfo["scan_type"] == "step":
|
|
# assert mock_det.num_use_all.get() == int(scaninfo["frames_per_trigger"]) * int(
|
|
# scaninfo["num_points"]
|
|
# )
|
|
# elif scaninfo["scan_type"] == "fly":
|
|
# assert mock_det.num_use_all.get() == int(scaninfo["num_points"])
|
|
# mock_det.preset_real.get() == 0
|
|
|
|
# # # CHeck custom_prepare.arm_acquisition
|
|
# # assert mock_det.custom_prepare.counter == 0
|
|
# # assert mock_det.erase_start.get() == 1
|
|
# # mock_prep_fw.assert_called_once()
|
|
# # # Check _prep_det
|
|
# # assert mock_det.cam.num_images.get() == int(
|
|
# # scaninfo["num_points"] * scaninfo["frames_per_trigger"]
|
|
# # )
|
|
# # assert mock_det.cam.num_frames.get() == 1
|
|
|
|
# # mock_publish_file_location.assert_called_with(done=False)
|
|
# # assert mock_det.cam.acquire.get() == 1
|
|
|
|
|
|
# def test_prepare_detector_backend(mock_det):
|
|
# mock_det.custom_prepare.prepare_detector_backend()
|
|
# assert mock_det.erase_all.get() == 1
|
|
# assert mock_det.read_mode.get() == ReadoutMode.EVENT
|
|
|
|
|
|
# def test_complete(mock_det):
|
|
# with (mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,):
|
|
# mock_det.complete()
|
|
# assert mock_finished.call_count == 1
|
|
|
|
|
|
# def test_stop_detector_backend(mock_det):
|
|
# mock_det.custom_prepare.stop_detector_backend()
|
|
# assert mock_det.custom_prepare.acquisition_done is True
|
|
|
|
|
|
# def test_stop(mock_det):
|
|
# with (
|
|
# mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det,
|
|
# mock.patch.object(
|
|
# mock_det.custom_prepare, "stop_detector_backend"
|
|
# ) as mock_stop_detector_backend,
|
|
# ):
|
|
# mock_det.stop()
|
|
# mock_stop_det.assert_called_once()
|
|
# mock_stop_detector_backend.assert_called_once()
|
|
# assert mock_det.stopped is True
|
|
|
|
|
|
# @pytest.mark.parametrize(
|
|
# "stopped, acquisition_done, acquiring_state, expected_exception",
|
|
# [
|
|
# (False, True, 0, False),
|
|
# (False, False, 0, True),
|
|
# (False, True, 1, True),
|
|
# (True, True, 0, True),
|
|
# ],
|
|
# )
|
|
# def test_finished(mock_det, stopped, acquisition_done, acquiring_state, expected_exception):
|
|
# mock_det.custom_prepare.acquisition_done = acquisition_done
|
|
# mock_det.acquiring._read_pv.mock_data = acquiring_state
|
|
# mock_det.scaninfo.num_points = 500
|
|
# mock_det.num_lines.put(500)
|
|
# mock_det.current_channel._read_pv.mock_data = 1
|
|
# mock_det.stopped = stopped
|
|
|
|
# if expected_exception:
|
|
# with pytest.raises(MCSTimeoutError):
|
|
# mock_det.timeout = 0.1
|
|
# mock_det.custom_prepare.finished()
|
|
# else:
|
|
# mock_det.custom_prepare.finished()
|
|
# if stopped:
|
|
# assert mock_det.stopped is stopped
|