diff --git a/csaxs_bec/bec_ipython_client/plugins/cSAXS/cSAXSDLPCA200.py b/csaxs_bec/bec_ipython_client/plugins/cSAXS/cSAXSDLPCA200.py index 68f5414..aedfaaa 100644 --- a/csaxs_bec/bec_ipython_client/plugins/cSAXS/cSAXSDLPCA200.py +++ b/csaxs_bec/bec_ipython_client/plugins/cSAXS/cSAXSDLPCA200.py @@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = { "rio_device": "galilrioesxbox", "description": "Beam Position Monitor 4 current amplifier", "channels": { - "gain_lsb": 0, # Pin 10 -> Galil ch0 + "gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0 "gain_mid": 1, # Pin 11 -> Galil ch1 "gain_msb": 2, # Pin 12 -> Galil ch2 "coupling": 3, # Pin 13 -> Galil ch3 diff --git a/csaxs_bec/device_configs/bl_endstation.yaml b/csaxs_bec/device_configs/bl_endstation.yaml index f038221..c798454 100644 --- a/csaxs_bec/device_configs/bl_endstation.yaml +++ b/csaxs_bec/device_configs/bl_endstation.yaml @@ -544,6 +544,66 @@ sl5trxt: # bl_smar_stage to use csaxs reference method. assign number according to axis channel bl_smar_stage: 5 +sl5ch: + description: ESbox1 slit 5 center horizontal + deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter + deviceConfig: + left_slit: sl5trxi + right_slit: sl5trxo + offset: 0 + enabled: true + onFailure: retry + readOnly: false + readoutPriority: baseline + needs: + - sl5trxi + - sl5trxo + +sl5wh: + description: ESbox1 slit 5 width horizontal + deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth + deviceConfig: + left_slit: sl5trxi + right_slit: sl5trxo + offset: 0 + enabled: true + onFailure: retry + readOnly: false + readoutPriority: baseline + needs: + - sl5trxi + - sl5trxo + +sl5cv: + description: ESbox1 slit 5 center vertical + deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter + deviceConfig: + left_slit: sl5trxb + right_slit: sl5trxt + offset: 0 + enabled: true + onFailure: retry + readOnly: false + readoutPriority: baseline + needs: + - sl5trxb + - sl5trxt + +sl5wv: + description: ESbox1 slit 5 width vertical + deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth + deviceConfig: + left_slit: sl5trxb + right_slit: sl5trxt + offset: 0 + enabled: true + onFailure: retry + readOnly: false + readoutPriority: baseline + needs: + - sl5trxb + - sl5trxt + xbimtrx: description: ESbox2 beam intensity monitor x movement deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor @@ -822,4 +882,37 @@ dettrx: onFailure: retry enabled: true readoutPriority: baseline - softwareTrigger: false \ No newline at end of file + softwareTrigger: false + + +#################### +### Beamstop control for flight tube +#################### + +beamstop_control: + description: Gain control for beamstop flightube + deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl + deviceConfig: + gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0 + gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1 + gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2 + coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3 + speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4 + enabled: true + readoutPriority: baseline + onFailure: retry + needs: + - galilrioesft + +galilrioesft: + description: Galil RIO for remote gain switching and slow reading FlightTube + deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO + deviceConfig: + host: galilrioesft.psi.ch + enabled: true + onFailure: retry + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 + + diff --git a/csaxs_bec/device_configs/bl_frontend.yaml b/csaxs_bec/device_configs/bl_frontend.yaml index 2471ac8..0ab1ae9 100644 --- a/csaxs_bec/device_configs/bl_frontend.yaml +++ b/csaxs_bec/device_configs/bl_frontend.yaml @@ -199,6 +199,25 @@ xbpm1c4: readOnly: true softwareTrigger: false +bpm1: + description: 'XBPM1 (frontend)' + deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM + deviceConfig: + left_top: xbpm1c1 + right_top: xbpm1c2 + right_bot: xbpm1c3 + left_bot: xbpm1c4 + onFailure: raise + enabled: true + readoutPriority: monitored + readOnly: true + softwareTrigger: false + needs: + - xbpm1c1 + - xbpm1c2 + - xbpm1c3 + - xbpm1c4 + ############################################ ######### End of xbpm sub devices ########## ############################################ diff --git a/csaxs_bec/device_configs/bl_optics_hutch.yaml b/csaxs_bec/device_configs/bl_optics_hutch.yaml index c08cc68..abb3a4a 100644 --- a/csaxs_bec/device_configs/bl_optics_hutch.yaml +++ b/csaxs_bec/device_configs/bl_optics_hutch.yaml @@ -68,21 +68,22 @@ ccmx: - cSAXS - optics -# TODO: motion does not stop at end of movement. Issue with device class +# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS! ccm_energy: - readoutPriority: baseline + description: 'test' deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner - deviceConfig: - prefix: "X12SA-OP-CCM1:" + deviceConfig: + prefix: 'X12SA-OP-CCM1:' override_suffixes: user_readback: "ENERGY-GET" user_setpoint: "ENERGY-SET" velocity: "ROTY.VELO" motor_done_move: "ROTY.DMOV" - deviceTags: - - user motors + onFailure: buffer enabled: true + readoutPriority: baseline readOnly: false + softwareTrigger: false diff --git a/csaxs_bec/device_configs/test_config.yaml b/csaxs_bec/device_configs/test_config.yaml new file mode 100644 index 0000000..3df835a --- /dev/null +++ b/csaxs_bec/device_configs/test_config.yaml @@ -0,0 +1,24 @@ +galilrioesxbox: + description: Galil RIO for remote gain switching and slow reading ES XBox + deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO + deviceConfig: + host: galilrioesft.psi.ch + enabled: true + onFailure: raise + readOnly: false + readoutPriority: baseline + connectionTimeout: 20 +bpm1: + readoutPriority: baseline + deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM + deviceConfig: + blade_t: galilrioesxbox.analog_in.ch0 + blade_r: galilrioesxbox.analog_in.ch1 + blade_b: galilrioesxbox.analog_in.ch2 + blade_l: galilrioesxbox.analog_in.ch3 + enabled: true + readOnly: false + softwareTrigger: true + needs: + - galilrioesxbox + diff --git a/csaxs_bec/devices/omny/shutter.py b/csaxs_bec/devices/omny/shutter.py index 1f10051..6e5e4fe 100644 --- a/csaxs_bec/devices/omny/shutter.py +++ b/csaxs_bec/devices/omny/shutter.py @@ -56,7 +56,6 @@ class OMNYFastShutter(PSIDeviceBase, Device): def fshopen(self): """Open the fast shutter.""" if self._check_if_cSAXS_shutter_exists_in_config(): - self.shutter.put(1) return self.device_manager.devices["fsh"].fshopen() else: self.shutter.put(1) @@ -64,7 +63,6 @@ class OMNYFastShutter(PSIDeviceBase, Device): def fshclose(self): """Close the fast shutter.""" if self._check_if_cSAXS_shutter_exists_in_config(): - self.shutter.put(0) return self.device_manager.devices["fsh"].fshclose() else: self.shutter.put(0) diff --git a/csaxs_bec/devices/pseudo_devices/__init__.py b/csaxs_bec/devices/pseudo_devices/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/csaxs_bec/devices/pseudo_devices/bpm.py b/csaxs_bec/devices/pseudo_devices/bpm.py new file mode 100644 index 0000000..9ba720f --- /dev/null +++ b/csaxs_bec/devices/pseudo_devices/bpm.py @@ -0,0 +1,172 @@ +"""Module for a BPM pseudo device that computes the position and intensity from the blade signals.""" + +from ophyd import Component as Cpt +from ophyd import Kind, Signal +from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase +from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal + + +class BPM(PSIPseudoDeviceBase): + """BPM positioner pseudo device.""" + + # Blade signals, a,b,c,d + left_top = Cpt( + BECProcessedSignal, + name="left_top", + model_config=None, + kind=Kind.config, + doc="BPM left_top blade", + ) + right_top = Cpt( + BECProcessedSignal, + name="right_top", + model_config=None, + kind=Kind.config, + doc="BPM right_top blade", + ) + right_bot = Cpt( + BECProcessedSignal, + name="right_bot", + model_config=None, + kind=Kind.config, + doc="BPM right_bottom blade", + ) + left_bot = Cpt( + BECProcessedSignal, + name="left_bot", + model_config=None, + kind=Kind.config, + doc="BPM left_bot blade", + ) + + # Virtual signals + pos_x = Cpt( + BECProcessedSignal, + name="pos_x", + model_config=None, + kind=Kind.config, + doc="BPM X position, -1 fully left, 1 fully right", + ) + pos_y = Cpt( + BECProcessedSignal, + name="pos_y", + model_config=None, + kind=Kind.config, + doc="BPM Y position, -1 fully bottom, 1 fully top", + ) + diagonal = Cpt( + BECProcessedSignal, + name="diagonal", + model_config=None, + kind=Kind.config, + doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot", + ) + intensity = Cpt( + BECProcessedSignal, + name="intensity", + model_config=None, + kind=Kind.config, + doc="BPM intensity", + ) + + def __init__( + self, + name, + left_top: str, + right_top: str, + right_bot: str, + left_bot: str, + device_manager=None, + scan_info=None, + **kwargs, + ): + super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs) + # Get all blade signal objects from utility method + signal_t = self.left_top.get_device_object_from_bec( + object_name=left_top, signal_name=self.name, device_manager=device_manager + ) + signal_r = self.right_top.get_device_object_from_bec( + object_name=right_top, signal_name=self.name, device_manager=device_manager + ) + signal_b = self.right_bot.get_device_object_from_bec( + object_name=right_bot, signal_name=self.name, device_manager=device_manager + ) + signal_l = self.left_bot.get_device_object_from_bec( + object_name=left_bot, signal_name=self.name, device_manager=device_manager + ) + + # Set compute methods for blade signals and virtual signals + self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t) + self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r) + self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b) + self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l) + + self.intensity.set_compute_method( + self._compute_intensity, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, + left_bot=self.left_bot, + ) + self.pos_x.set_compute_method( + self._compute_pos_x, + left_bot=self.left_bot, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, + ) + self.pos_y.set_compute_method( + self._compute_pos_y, + left_bot=self.left_bot, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, + ) + self.diagonal.set_compute_method( + self._compute_diagonal, + left_bot=self.left_bot, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, + ) + + def _compute_blade_signal(self, signal: Signal) -> float: + return signal.get() + + def _compute_intensity( + self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal + ) -> float: + intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get() + return intensity + + def _compute_pos_x( + self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal + ) -> float: + """X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side.""" + sum_left = left_bot.get() + left_top.get() + sum_right = right_top.get() + right_bot.get() + sum_total = sum_left + sum_right + if sum_total == 0: + return 0.0 + return (sum_right - sum_left) / sum_total + + def _compute_pos_y( + self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal + ) -> float: + """Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side.""" + sum_top = left_top.get() + right_top.get() + sum_bot = right_bot.get() + left_bot.get() + sum_total = sum_top + sum_bot + if sum_total == 0: + return 0.0 + return (sum_top - sum_bot) / sum_total + + def _compute_diagonal( + self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal + ) -> float: + sum_diag1 = left_bot.get() + right_top.get() + sum_diag2 = left_top.get() + right_bot.get() + sum_total = sum_diag1 + sum_diag2 + if sum_total == 0: + return 0.0 + return (sum_diag1 - sum_diag2) / sum_total diff --git a/csaxs_bec/devices/pseudo_devices/bpm_control.py b/csaxs_bec/devices/pseudo_devices/bpm_control.py new file mode 100644 index 0000000..7388473 --- /dev/null +++ b/csaxs_bec/devices/pseudo_devices/bpm_control.py @@ -0,0 +1,189 @@ +""" +Module for controlling the BPM amplifier settings, such as gain and coupling. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +from ophyd import Component as Cpt +from ophyd import Kind +from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase +from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.devicemanager import ScanInfo + from bec_server.device_server.devices.devicemanager import DeviceManagerDS + from ophyd import Signal + +_GAIN_BITS_LOW_NOISE: dict[tuple, int] = { + (0, 0, 0): int(1e3), + (0, 0, 1): int(1e4), + (0, 1, 0): int(1e5), + (0, 1, 1): int(1e6), + (1, 0, 0): int(1e7), + (1, 0, 1): int(1e8), + (1, 1, 0): int(1e9), +} + +_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = { + (0, 0, 0): int(1e5), + (0, 0, 1): int(1e6), + (0, 1, 0): int(1e7), + (0, 1, 1): int(1e8), + (1, 0, 0): int(1e9), + (1, 0, 1): int(1e10), + (1, 1, 0): int(1e11), +} + +_GAIN_TO_BITS: dict[int, tuple] = {} +for _bits, _gain in _GAIN_BITS_LOW_NOISE.items(): + _GAIN_TO_BITS[_gain] = (*_bits, True) +for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items(): + if _gain not in _GAIN_TO_BITS: # low-noise takes priority + _GAIN_TO_BITS[_gain] = (*_bits, False) + +VALID_GAINS = sorted(_GAIN_TO_BITS.keys()) + + +class BPMControl(PSIPseudoDeviceBase): + """ + BPM amplifier control pseudo device. It is responsible for controlling the + gain and coupling for the BPM amplifier. It relies on signals from a device + in BEC to be available. For cSAXS, these are most liikely to be from the + GalilRIO device that controls the BPM amplifier. + + Args: + name (str): Name of the pseudo device. + gain_lsb (str): Name of the signal in BEC that controls the LSB + of the gain setting. + gain_mid (str): Name of the signal in BEC that controls the MID + bit of the gain setting. + gain_msb (str): Name of the signal in BEC that controls the MSB + of the gain setting. + coupling (str): Name of the signal in BEC that controls the coupling + setting. + speed_mode (str): Name of the signal in BEC that controls the speed mode + (low-noise vs high-speed) of the amplifier. + """ + + USER_ACCESS = ["set_gain", "set_coupling"] + + gain = Cpt( + BECProcessedSignal, + name="gain", + model_config=None, + kind=Kind.config, + doc="Gain of the amplifier", + ) + coupling = Cpt( + BECProcessedSignal, + name="coupling", + model_config=None, + kind=Kind.config, + doc="Coupling of the amplifier", + ) + speed = Cpt( + BECProcessedSignal, + name="speed", + model_config=None, + kind=Kind.config, + doc="Speed of the amplifier", + ) + + def __init__( + self, + name: str, + gain_lsb: str, + gain_mid: str, + gain_msb: str, + coupling: str, + speed_mode: str, + device_manager: DeviceManagerDS | None = None, + scan_info: ScanInfo | None = None, + **kwargs, + ): + super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs) + + # First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class. + self._gain_lsb = self.gain.get_device_object_from_bec( + object_name=gain_lsb, signal_name=self.name, device_manager=device_manager + ) + self._gain_mid = self.gain.get_device_object_from_bec( + object_name=gain_mid, signal_name=self.name, device_manager=device_manager + ) + self._gain_msb = self.gain.get_device_object_from_bec( + object_name=gain_msb, signal_name=self.name, device_manager=device_manager + ) + self._coupling = self.gain.get_device_object_from_bec( + object_name=coupling, signal_name=self.name, device_manager=device_manager + ) + self._speed_mode = self.gain.get_device_object_from_bec( + object_name=speed_mode, signal_name=self.name, device_manager=device_manager + ) + + # Set the compute methods for the virtual signals. + self.gain.set_compute_method( + self._compute_gain, + msb=self._gain_msb, + mid=self._gain_mid, + lsb=self._gain_lsb, + speed_mode=self._speed_mode, + ) + self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling) + self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode) + + def set_gain( + self, + gain: Literal[ + 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000 + ], + ) -> None: + """ + Set the gain of the amplifier. + + Args: + gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000. + """ + gain_int = int(gain) + if gain_int not in VALID_GAINS: + raise ValueError( + f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}" + ) + + msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int] + + self._gain_msb.set(bool(msb)).wait(timeout=2) + self._gain_lsb.set(bool(lsb)).wait(timeout=2) + self._gain_mid.set(bool(mid)).wait(timeout=2) + self._speed_mode.set(bool(use_low_noise)) + + def set_coupling(self, coupling: Literal["AC", "DC"]) -> None: + """ + Set the coupling of the amplifier. + + Args: + coupling (Literal): Must be either "AC" or "DC". + """ + if coupling not in ["AC", "DC"]: + raise ValueError( + f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'" + ) + self._coupling.set(coupling == "DC").wait(timeout=2) + + def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int: + """Compute the gain based on the bits and speed mode.""" + bits = (msb.get(), mid.get(), lsb.get()) + speed_mode = speed_mode.get() + if speed_mode: + return _GAIN_BITS_LOW_NOISE.get(bits) + else: + return _GAIN_BITS_HIGH_SPEED.get(bits) + + def _compute_coupling(self, coupling: Signal) -> str: + """Compute the coupling based on the signal.""" + return "DC" if coupling.get() else "AC" + + def _compute_speed(self, speed: Signal) -> str: + """Compute the speed based on the signal.""" + return "low_speed" if speed.get() else "high_speed" diff --git a/csaxs_bec/devices/pseudo_devices/dlpca200_settings.py b/csaxs_bec/devices/pseudo_devices/dlpca200_settings.py new file mode 100644 index 0000000..170db5a --- /dev/null +++ b/csaxs_bec/devices/pseudo_devices/dlpca200_settings.py @@ -0,0 +1 @@ +# from ophyd \ No newline at end of file diff --git a/tests/tests_devices/test_pseudo_devices.py b/tests/tests_devices/test_pseudo_devices.py new file mode 100644 index 0000000..60c59a2 --- /dev/null +++ b/tests/tests_devices/test_pseudo_devices.py @@ -0,0 +1,241 @@ +"""Module to test the pseudo_device module.""" + +import pytest +from bec_lib.atlas_models import Device +from ophyd_devices.sim.sim_signals import SetableSignal + +from csaxs_bec.devices.pseudo_devices.bpm import BPM +from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl + + +@pytest.fixture +def patched_dm(dm_with_devices): + # Patch missing current_session attribute in the device manager + dm = dm_with_devices + setattr(dm, "current_session", dm._session) + # + signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config") + signal_mid = SetableSignal(name="gain_mid", value=0, kind="config") + signal_msb = SetableSignal(name="gain_msb", value=0, kind="config") + signal_coupling = SetableSignal(name="coupling", value=0, kind="config") + signal_speed = SetableSignal(name="speed_mode", value=0, kind="config") + for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]: + dev_cfg = Device( + name=signal.name, + deviceClass="ophyd_devices.sim.sim_signals.SetableSignal", + enabled=True, + readoutPriority="baseline", + ) + dm._session["devices"].append(dev_cfg.model_dump()) + dm.devices._add_device(signal.name, signal) + return dm + + +@pytest.fixture +def bpm_control(patched_dm): + name = "bpm_control" + control_config = Device( + name=name, + deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl", + enabled=True, + readoutPriority="baseline", + deviceConfig={ + "gain_lsb": "gain_lsb", + "gain_mid": "gain_mid", + "gain_msb": "gain_msb", + "coupling": "coupling", + "speed_mode": "speed_mode", + }, + needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"], + ) + patched_dm._session["devices"].append(control_config.model_dump()) + try: + control = BPMControl( + name=name, + gain_lsb="gain_lsb", + gain_mid="gain_mid", + gain_msb="gain_msb", + coupling="coupling", + speed_mode="speed_mode", + device_manager=patched_dm, + ) + patched_dm.devices._add_device(control.name, control) + control.wait_for_connection() + yield control + finally: + control.destroy() + + +def test_bpm_control_set_gain(bpm_control): + gain_lsb = bpm_control.device_manager.devices["gain_lsb"] + gain_mid = bpm_control.device_manager.devices["gain_mid"] + gain_msb = bpm_control.device_manager.devices["gain_msb"] + coupling = bpm_control.device_manager.devices["coupling"] + speed_mode = bpm_control.device_manager.devices["speed_mode"] + gain_lsb.put(0) + gain_mid.put(0) + gain_msb.put(0) + coupling.put(0) + speed_mode.put(1) + + gain = bpm_control.gain.get() + assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1) + + gain_val = 10000000 + bpm_control.set_gain(gain_val) + assert _GAIN_TO_BITS.get(gain_val, ()) == ( + gain_msb.get(), + gain_mid.get(), + gain_lsb.get(), + speed_mode.get(), + ) + + gain_val = 100000000000 + bpm_control.set_gain(gain_val) + assert _GAIN_TO_BITS.get(gain_val, ()) == ( + gain_msb.get(), + gain_mid.get(), + gain_lsb.get(), + speed_mode.get(), + ) + + with pytest.raises(ValueError): + bpm_control.set_gain(1005.0) + + +def test_bpm_control_set_coupling(bpm_control): + coupling = bpm_control.device_manager.devices["coupling"] + coupling.put(0) + + bpm_control.coupling.get() == "AC" + coupling.put(1) + bpm_control.coupling.get() == "DC" + + bpm_control.set_coupling("AC") + assert coupling.get() == 0 + + with pytest.raises(ValueError): + bpm_control.set_coupling("wrong") + + +@pytest.fixture +def patched_dm_bpm(dm_with_devices): + # Patch missing current_session attribute in the device manager + dm = dm_with_devices + setattr(dm, "current_session", dm._session) + # + left_top = SetableSignal(name="left_top", value=0, kind="config") + right_top = SetableSignal(name="right_top", value=0, kind="config") + right_bot = SetableSignal(name="right_bot", value=0, kind="config") + left_bot = SetableSignal(name="left_bot", value=0, kind="config") + for signal in [left_top, right_top, right_bot, left_bot]: + + dev_cfg = Device( + name=signal.name, + deviceClass="ophyd_devices.sim.sim_signals.SetableSignal", + enabled=True, + readoutPriority="baseline", + ) + dm._session["devices"].append(dev_cfg.model_dump()) + dm.devices._add_device(signal.name, signal) + return dm + + +@pytest.fixture +def bpm(patched_dm_bpm): + name = "bpm" + bpm_config = Device( + name=name, + deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM", + enabled=True, + readoutPriority="baseline", + deviceConfig={ + "left_top": "left_top", + "right_top": "right_top", + "right_bot": "right_bot", + "left_bot": "left_bot", + }, + needs=["left_top", "right_top", "right_bot", "left_bot"], + ) + patched_dm_bpm._session["devices"].append(bpm_config.model_dump()) + try: + bpm = BPM( + name=name, + left_top="left_top", + right_top="right_top", + right_bot="right_bot", + left_bot="left_bot", + device_manager=patched_dm_bpm, + ) + patched_dm_bpm.devices._add_device(bpm.name, bpm) + bpm.wait_for_connection() + yield bpm + finally: + bpm.destroy() + + +def test_bpm_positions(bpm): + left_top = bpm.device_manager.devices["left_top"] + right_top = bpm.device_manager.devices["right_top"] + right_bot = bpm.device_manager.devices["right_bot"] + left_bot = bpm.device_manager.devices["left_bot"] + + # Test center position + for signal in [left_top, right_top, right_bot, left_bot]: + signal.put(1) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 0 + + # Test fully left + left_top.put(1) + right_top.put(0) + right_bot.put(0) + left_bot.put(1) + assert bpm.pos_x.get() == -1 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == 0 + assert bpm.intensity.get() == 2 + + # Test fully right + left_top.put(0) + right_top.put(1) + right_bot.put(1) + left_bot.put(0) + assert bpm.pos_x.get() == 1 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == 0 + + # Test fully top + left_top.put(1) + right_top.put(1) + right_bot.put(0) + left_bot.put(0) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 1 + assert bpm.diagonal.get() == 0 + + # Test fully bottom + left_top.put(0) + right_top.put(0) + right_bot.put(1) + left_bot.put(1) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == -1 + assert bpm.diagonal.get() == 0 + + # Diagonal beam + left_top.put(1) + right_top.put(0) + right_bot.put(1) + left_bot.put(0) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == -1 + + left_top.put(0) + right_top.put(1) + right_bot.put(0) + left_bot.put(1) + assert bpm.pos_x.get() == 0 + assert bpm.pos_y.get() == 0 + assert bpm.diagonal.get() == 1