diff --git a/csaxs_bec/device_configs/bl_frontend.yaml b/csaxs_bec/device_configs/bl_frontend.yaml index 2c7eae3..0ab1ae9 100644 --- a/csaxs_bec/device_configs/bl_frontend.yaml +++ b/csaxs_bec/device_configs/bl_frontend.yaml @@ -203,10 +203,10 @@ bpm1: description: 'XBPM1 (frontend)' deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM deviceConfig: - blade_t: xbpm1c1 - blade_r: xbpm1c2 - blade_b: xbpm1c3 - blade_l: xbpm1c4 + left_top: xbpm1c1 + right_top: xbpm1c2 + right_bot: xbpm1c3 + left_bot: xbpm1c4 onFailure: raise enabled: true readoutPriority: monitored diff --git a/tests/tests_devices/test_pseudo_devices.py b/tests/tests_devices/test_pseudo_devices.py new file mode 100644 index 0000000..60c59a2 --- /dev/null +++ b/tests/tests_devices/test_pseudo_devices.py @@ -0,0 +1,241 @@ +"""Module to test the pseudo_device module.""" + +import pytest +from bec_lib.atlas_models import Device +from ophyd_devices.sim.sim_signals import SetableSignal + +from csaxs_bec.devices.pseudo_devices.bpm import BPM +from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl + + +@pytest.fixture +def patched_dm(dm_with_devices): + # Patch missing current_session attribute in the device manager + dm = dm_with_devices + setattr(dm, "current_session", dm._session) + # + signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config") + signal_mid = SetableSignal(name="gain_mid", value=0, kind="config") + signal_msb = SetableSignal(name="gain_msb", value=0, kind="config") + signal_coupling = SetableSignal(name="coupling", value=0, kind="config") + signal_speed = SetableSignal(name="speed_mode", value=0, kind="config") + for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]: + dev_cfg = Device( + name=signal.name, + deviceClass="ophyd_devices.sim.sim_signals.SetableSignal", + enabled=True, + readoutPriority="baseline", + ) + dm._session["devices"].append(dev_cfg.model_dump()) + dm.devices._add_device(signal.name, signal) + return dm + + +@pytest.fixture +def bpm_control(patched_dm): + name = "bpm_control" + control_config = Device( + name=name, + deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl", + enabled=True, + readoutPriority="baseline", + deviceConfig={ + "gain_lsb": "gain_lsb", + "gain_mid": "gain_mid", + "gain_msb": "gain_msb", + "coupling": "coupling", + "speed_mode": "speed_mode", + }, + needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"], + ) + patched_dm._session["devices"].append(control_config.model_dump()) + try: + control = BPMControl( + name=name, + gain_lsb="gain_lsb", + gain_mid="gain_mid", + gain_msb="gain_msb", + coupling="coupling", + speed_mode="speed_mode", + device_manager=patched_dm, + ) + patched_dm.devices._add_device(control.name, control) + control.wait_for_connection() + yield control + finally: + control.destroy() + + +def test_bpm_control_set_gain(bpm_control): + gain_lsb = bpm_control.device_manager.devices["gain_lsb"] + gain_mid = bpm_control.device_manager.devices["gain_mid"] + gain_msb = bpm_control.device_manager.devices["gain_msb"] + coupling = bpm_control.device_manager.devices["coupling"] + speed_mode = bpm_control.device_manager.devices["speed_mode"] + gain_lsb.put(0) + gain_mid.put(0) + gain_msb.put(0) + coupling.put(0) + speed_mode.put(1) + + gain = bpm_control.gain.get() + assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1) + + gain_val = 10000000 + bpm_control.set_gain(gain_val) + assert _GAIN_TO_BITS.get(gain_val, ()) == ( + gain_msb.get(), + gain_mid.get(), + gain_lsb.get(), + speed_mode.get(), + ) + + gain_val = 100000000000 + bpm_control.set_gain(gain_val) + assert _GAIN_TO_BITS.get(gain_val, ()) == ( + gain_msb.get(), + gain_mid.get(), + gain_lsb.get(), + speed_mode.get(), + ) + + with pytest.raises(ValueError): + bpm_control.set_gain(1005.0) + + +def test_bpm_control_set_coupling(bpm_control): + coupling = bpm_control.device_manager.devices["coupling"] + coupling.put(0) + + bpm_control.coupling.get() == "AC" + coupling.put(1) + bpm_control.coupling.get() == "DC" + + bpm_control.set_coupling("AC") + assert coupling.get() == 0 + + with pytest.raises(ValueError): + bpm_control.set_coupling("wrong") + + +@pytest.fixture +def patched_dm_bpm(dm_with_devices): + # Patch missing current_session attribute in the device manager + dm = dm_with_devices + setattr(dm, "current_session", dm._session) + # + left_top = SetableSignal(name="left_top", value=0, kind="config") + right_top = SetableSignal(name="right_top", value=0, kind="config") + right_bot = SetableSignal(name="right_bot", value=0, kind="config") + left_bot = SetableSignal(name="left_bot", value=0, kind="config") + for signal in [left_top, right_top, right_bot, left_bot]: + + dev_cfg = Device( + name=signal.name, + deviceClass="ophyd_devices.sim.sim_signals.SetableSignal", + enabled=True, + readoutPriority="baseline", + ) + dm._session["devices"].append(dev_cfg.model_dump()) + dm.devices._add_device(signal.name, signal) + return dm + + +@pytest.fixture +def bpm(patched_dm_bpm): + name = "bpm" + bpm_config = Device( + name=name, + deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM", + enabled=True, + readoutPriority="baseline", + deviceConfig={ + "left_top": "left_top", + "right_top": "right_top", + "right_bot": "right_bot", + "left_bot": "left_bot", + }, + needs=["left_top", "right_top", "right_bot", "left_bot"], + ) + patched_dm_bpm._session["devices"].append(bpm_config.model_dump()) + try: + bpm = BPM( + name=name, + left_top="left_top", + right_top="right_top", + right_bot="right_bot", + left_bot="left_bot", + device_manager=patched_dm_bpm, + ) + patched_dm_bpm.devices._add_device(bpm.name, bpm) + bpm.wait_for_connection() + yield bpm + finally: + bpm.destroy() + + +def test_bpm_positions(bpm): + left_top = bpm.device_manager.devices["left_top"] + right_top = bpm.device_manager.devices["right_top"] + right_bot = bpm.device_manager.devices["right_bot"] + left_bot = bpm.device_manager.devices["left_bot"] + + # Test center position + for signal in [left_top, right_top, right_bot, left_bot]: + signal.put(1) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 0 + + # Test fully left + left_top.put(1) + right_top.put(0) + right_bot.put(0) + left_bot.put(1) + assert bpm.pos_x.get() == -1 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == 0 + assert bpm.intensity.get() == 2 + + # Test fully right + left_top.put(0) + right_top.put(1) + right_bot.put(1) + left_bot.put(0) + assert bpm.pos_x.get() == 1 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == 0 + + # Test fully top + left_top.put(1) + right_top.put(1) + right_bot.put(0) + left_bot.put(0) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 1 + assert bpm.diagonal.get() == 0 + + # Test fully bottom + left_top.put(0) + right_top.put(0) + right_bot.put(1) + left_bot.put(1) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == -1 + assert bpm.diagonal.get() == 0 + + # Diagonal beam + left_top.put(1) + right_top.put(0) + right_bot.put(1) + left_bot.put(0) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == -1 + + left_top.put(0) + right_top.put(1) + right_bot.put(0) + left_bot.put(1) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == 1