test: add tests for bpm and bpm_control
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
CI for csaxs_bec / test (pull_request) Successful in 1m58s

This commit is contained in:
2026-03-27 20:00:42 +01:00
parent 9557d98f30
commit b07ac52371
2 changed files with 245 additions and 4 deletions

View File

@@ -203,10 +203,10 @@ bpm1:
description: 'XBPM1 (frontend)'
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: xbpm1c1
blade_r: xbpm1c2
blade_b: xbpm1c3
blade_l: xbpm1c4
left_top: xbpm1c1
right_top: xbpm1c2
right_bot: xbpm1c3
left_bot: xbpm1c4
onFailure: raise
enabled: true
readoutPriority: monitored

View File

@@ -0,0 +1,241 @@
"""Module to test the pseudo_device module."""
import pytest
from bec_lib.atlas_models import Device
from ophyd_devices.sim.sim_signals import SetableSignal
from csaxs_bec.devices.pseudo_devices.bpm import BPM
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
@pytest.fixture
def patched_dm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm_control(patched_dm):
name = "bpm_control"
control_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"gain_lsb": "gain_lsb",
"gain_mid": "gain_mid",
"gain_msb": "gain_msb",
"coupling": "coupling",
"speed_mode": "speed_mode",
},
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
)
patched_dm._session["devices"].append(control_config.model_dump())
try:
control = BPMControl(
name=name,
gain_lsb="gain_lsb",
gain_mid="gain_mid",
gain_msb="gain_msb",
coupling="coupling",
speed_mode="speed_mode",
device_manager=patched_dm,
)
patched_dm.devices._add_device(control.name, control)
control.wait_for_connection()
yield control
finally:
control.destroy()
def test_bpm_control_set_gain(bpm_control):
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
gain_mid = bpm_control.device_manager.devices["gain_mid"]
gain_msb = bpm_control.device_manager.devices["gain_msb"]
coupling = bpm_control.device_manager.devices["coupling"]
speed_mode = bpm_control.device_manager.devices["speed_mode"]
gain_lsb.put(0)
gain_mid.put(0)
gain_msb.put(0)
coupling.put(0)
speed_mode.put(1)
gain = bpm_control.gain.get()
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
gain_val = 10000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
gain_val = 100000000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
with pytest.raises(ValueError):
bpm_control.set_gain(1005.0)
def test_bpm_control_set_coupling(bpm_control):
coupling = bpm_control.device_manager.devices["coupling"]
coupling.put(0)
bpm_control.coupling.get() == "AC"
coupling.put(1)
bpm_control.coupling.get() == "DC"
bpm_control.set_coupling("AC")
assert coupling.get() == 0
with pytest.raises(ValueError):
bpm_control.set_coupling("wrong")
@pytest.fixture
def patched_dm_bpm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
left_top = SetableSignal(name="left_top", value=0, kind="config")
right_top = SetableSignal(name="right_top", value=0, kind="config")
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
for signal in [left_top, right_top, right_bot, left_bot]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm(patched_dm_bpm):
name = "bpm"
bpm_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"left_top": "left_top",
"right_top": "right_top",
"right_bot": "right_bot",
"left_bot": "left_bot",
},
needs=["left_top", "right_top", "right_bot", "left_bot"],
)
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
try:
bpm = BPM(
name=name,
left_top="left_top",
right_top="right_top",
right_bot="right_bot",
left_bot="left_bot",
device_manager=patched_dm_bpm,
)
patched_dm_bpm.devices._add_device(bpm.name, bpm)
bpm.wait_for_connection()
yield bpm
finally:
bpm.destroy()
def test_bpm_positions(bpm):
left_top = bpm.device_manager.devices["left_top"]
right_top = bpm.device_manager.devices["right_top"]
right_bot = bpm.device_manager.devices["right_bot"]
left_bot = bpm.device_manager.devices["left_bot"]
# Test center position
for signal in [left_top, right_top, right_bot, left_bot]:
signal.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
# Test fully left
left_top.put(1)
right_top.put(0)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == -1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
assert bpm.intensity.get() == 2
# Test fully right
left_top.put(0)
right_top.put(1)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
# Test fully top
left_top.put(1)
right_top.put(1)
right_bot.put(0)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 1
assert bpm.diagonal.get() == 0
# Test fully bottom
left_top.put(0)
right_top.put(0)
right_bot.put(1)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == -1
assert bpm.diagonal.get() == 0
# Diagonal beam
left_top.put(1)
right_top.put(0)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == -1
left_top.put(0)
right_top.put(1)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 1