From 9557d98f304517dfe4fcd917ce2961c6c36fcd03 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 27 Mar 2026 20:00:17 +0100 Subject: [PATCH] fix(pseudo_devices): fix pseudo devices, bpm and bpm_control --- csaxs_bec/devices/pseudo_devices/bpm.py | 169 +++++++++++------- .../devices/pseudo_devices/bpm_control.py | 138 ++++++++++---- 2 files changed, 206 insertions(+), 101 deletions(-) diff --git a/csaxs_bec/devices/pseudo_devices/bpm.py b/csaxs_bec/devices/pseudo_devices/bpm.py index c43ccc4..9ba720f 100644 --- a/csaxs_bec/devices/pseudo_devices/bpm.py +++ b/csaxs_bec/devices/pseudo_devices/bpm.py @@ -1,141 +1,172 @@ -import time -from typing import TYPE_CHECKING +"""Module for a BPM pseudo device that computes the position and intensity from the blade signals.""" + from ophyd import Component as Cpt from ophyd import Kind, Signal from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal -if TYPE_CHECKING: #pragma-nocover.... - from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO - - class BPM(PSIPseudoDeviceBase): """BPM positioner pseudo device.""" # Blade signals, a,b,c,d - top = Cpt( - BECProcessedSignal, name="top", model_config=None, kind=Kind.config, doc="... top blade" + left_top = Cpt( + BECProcessedSignal, + name="left_top", + model_config=None, + kind=Kind.config, + doc="BPM left_top blade", ) - right = Cpt( - BECProcessedSignal, name="right", model_config=None, kind=Kind.config, doc="... right blade" + right_top = Cpt( + BECProcessedSignal, + name="right_top", + model_config=None, + kind=Kind.config, + doc="BPM right_top blade", ) - bot = Cpt( - BECProcessedSignal, name="bot", model_config=None, kind=Kind.config, doc="... bot blade" + right_bot = Cpt( + BECProcessedSignal, + name="right_bot", + model_config=None, + kind=Kind.config, + doc="BPM right_bottom blade", ) - left = Cpt( - BECProcessedSignal, name="left", model_config=None, kind=Kind.config, doc="... left blade" + left_bot = Cpt( + BECProcessedSignal, + name="left_bot", + model_config=None, + kind=Kind.config, + doc="BPM left_bot blade", ) # Virtual signals pos_x = Cpt( - BECProcessedSignal, name="pos_x", model_config=None, kind=Kind.config, doc="... pos_x" + BECProcessedSignal, + name="pos_x", + model_config=None, + kind=Kind.config, + doc="BPM X position, -1 fully left, 1 fully right", ) pos_y = Cpt( - BECProcessedSignal, name="pos_y", model_config=None, kind=Kind.config, doc="... pos_y" + BECProcessedSignal, + name="pos_y", + model_config=None, + kind=Kind.config, + doc="BPM Y position, -1 fully bottom, 1 fully top", ) diagonal = Cpt( - BECProcessedSignal, name="diagonal", model_config=None, kind=Kind.config, doc="... diagonal" + BECProcessedSignal, + name="diagonal", + model_config=None, + kind=Kind.config, + doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot", ) intensity = Cpt( BECProcessedSignal, name="intensity", model_config=None, kind=Kind.config, - doc="... intensity", + doc="BPM intensity", ) def __init__( self, name, - blade_t: str, - blade_r: str, - blade_b: str, - blade_l: str, + left_top: str, + right_top: str, + right_bot: str, + left_bot: str, device_manager=None, scan_info=None, **kwargs, ): super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs) # Get all blade signal objects from utility method - signal_t = self.top.get_device_object_from_bec( - object_name=blade_t, signal_name=self.name, device_manager=device_manager + signal_t = self.left_top.get_device_object_from_bec( + object_name=left_top, signal_name=self.name, device_manager=device_manager ) - signal_r = self.right.get_device_object_from_bec( - object_name=blade_r, signal_name=self.name, device_manager=device_manager + signal_r = self.right_top.get_device_object_from_bec( + object_name=right_top, signal_name=self.name, device_manager=device_manager ) - signal_b = self.bot.get_device_object_from_bec( - object_name=blade_b, signal_name=self.name, device_manager=device_manager + signal_b = self.right_bot.get_device_object_from_bec( + object_name=right_bot, signal_name=self.name, device_manager=device_manager ) - signal_l = self.left.get_device_object_from_bec( - object_name=blade_l, signal_name=self.name, device_manager=device_manager + signal_l = self.left_bot.get_device_object_from_bec( + object_name=left_bot, signal_name=self.name, device_manager=device_manager ) # Set compute methods for blade signals and virtual signals - self.top.set_compute_method(self._compute_blade_signal, signal=signal_t) - self.right.set_compute_method(self._compute_blade_signal, signal=signal_r) - self.bot.set_compute_method(self._compute_blade_signal, signal=signal_b) - self.left.set_compute_method(self._compute_blade_signal, signal=signal_l) + self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t) + self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r) + self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b) + self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l) self.intensity.set_compute_method( - self._compute_intensity, top=self.top, right=self.right, bot=self.bot, left=self.left + self._compute_intensity, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, + left_bot=self.left_bot, ) self.pos_x.set_compute_method( - self._compute_pos_x, left=self.left, top=self.top, right=self.right, bot=self.bot + self._compute_pos_x, + left_bot=self.left_bot, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, ) self.pos_y.set_compute_method( - self._compute_pos_y, left=self.left, top=self.top, right=self.right, bot=self.bot + self._compute_pos_y, + left_bot=self.left_bot, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, ) self.diagonal.set_compute_method( - self._compute_diagonal, left=self.left, top=self.top, right=self.right, bot=self.bot + self._compute_diagonal, + left_bot=self.left_bot, + left_top=self.left_top, + right_top=self.right_top, + right_bot=self.right_bot, ) def _compute_blade_signal(self, signal: Signal) -> float: return signal.get() - def _compute_intensity(self, top: Signal, right: Signal, bot: Signal, left: Signal) -> float: - intensity = top.get() + right.get() + bot.get() + left.get() + def _compute_intensity( + self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal + ) -> float: + intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get() return intensity - def _compute_pos_x(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float: - sum_left = left.get() + top.get() - sum_right = right.get() + bot.get() + def _compute_pos_x( + self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal + ) -> float: + """X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side.""" + sum_left = left_bot.get() + left_top.get() + sum_right = right_top.get() + right_bot.get() sum_total = sum_left + sum_right if sum_total == 0: return 0.0 - return (sum_left - sum_right) / sum_total + return (sum_right - sum_left) / sum_total - def _compute_pos_y(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float: - sum_top = top.get() + right.get() - sum_bot = bot.get() + left.get() + def _compute_pos_y( + self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal + ) -> float: + """Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side.""" + sum_top = left_top.get() + right_top.get() + sum_bot = right_bot.get() + left_bot.get() sum_total = sum_top + sum_bot if sum_total == 0: return 0.0 return (sum_top - sum_bot) / sum_total - def _compute_diagonal(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float: - sum_diag1 = left.get() + right.get() - sum_diag2 = top.get() + bot.get() + def _compute_diagonal( + self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal + ) -> float: + sum_diag1 = left_bot.get() + right_top.get() + sum_diag2 = left_top.get() + right_bot.get() sum_total = sum_diag1 + sum_diag2 if sum_total == 0: return 0.0 return (sum_diag1 - sum_diag2) / sum_total - - -class BPMRio(BPM): - - def __init__( - self, - name, - blade_t: str, - blade_r: str, - blade_b: str, - blade_l: str, - rio:str, - device_manager=None, - scan_info=None, - **kwargs, - ): - super().__init__(name=name, blade_t=blade_t, blade_r=blade_r, blade_b=blade_b, blade_l=blade_l, device_manager=device_manager, scan_info=scan_info, **kwargs) - rio:GalilRIO = self.top.get_device_object_from_bec(object_name=rio, signal_name=name, device_manager=device_manager) - diff --git a/csaxs_bec/devices/pseudo_devices/bpm_control.py b/csaxs_bec/devices/pseudo_devices/bpm_control.py index f162dc6..7388473 100644 --- a/csaxs_bec/devices/pseudo_devices/bpm_control.py +++ b/csaxs_bec/devices/pseudo_devices/bpm_control.py @@ -1,7 +1,20 @@ +""" +Module for controlling the BPM amplifier settings, such as gain and coupling. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +from ophyd import Component as Cpt +from ophyd import Kind from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal -from ophyd import Component as Cpt, Kind, Signal -from typing import Literal + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.devicemanager import ScanInfo + from bec_server.device_server.devices.devicemanager import DeviceManagerDS + from ophyd import Signal _GAIN_BITS_LOW_NOISE: dict[tuple, int] = { (0, 0, 0): int(1e3), @@ -33,29 +46,66 @@ for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items(): VALID_GAINS = sorted(_GAIN_TO_BITS.keys()) - class BPMControl(PSIPseudoDeviceBase): + """ + BPM amplifier control pseudo device. It is responsible for controlling the + gain and coupling for the BPM amplifier. It relies on signals from a device + in BEC to be available. For cSAXS, these are most liikely to be from the + GalilRIO device that controls the BPM amplifier. + + Args: + name (str): Name of the pseudo device. + gain_lsb (str): Name of the signal in BEC that controls the LSB + of the gain setting. + gain_mid (str): Name of the signal in BEC that controls the MID + bit of the gain setting. + gain_msb (str): Name of the signal in BEC that controls the MSB + of the gain setting. + coupling (str): Name of the signal in BEC that controls the coupling + setting. + speed_mode (str): Name of the signal in BEC that controls the speed mode + (low-noise vs high-speed) of the amplifier. + """ USER_ACCESS = ["set_gain", "set_coupling"] - gain = Cpt(BECProcessedSignal, name="gain", model_config=None, kind=Kind.config, doc="Gain of the amplifier") - coupling = Cpt(BECProcessedSignal, name="coupling", model_config=None, kind=Kind.config, doc="Coupling of the amplifier") - speed = Cpt(BECProcessedSignal, name="speed", model_config=None, kind=Kind.config, doc="Speed of the amplifier") + gain = Cpt( + BECProcessedSignal, + name="gain", + model_config=None, + kind=Kind.config, + doc="Gain of the amplifier", + ) + coupling = Cpt( + BECProcessedSignal, + name="coupling", + model_config=None, + kind=Kind.config, + doc="Coupling of the amplifier", + ) + speed = Cpt( + BECProcessedSignal, + name="speed", + model_config=None, + kind=Kind.config, + doc="Speed of the amplifier", + ) def __init__( self, - name, - gain_lsb, - gain_mid, - gain_msb, - coupling, - speed_mode, - device_manager=None, - scan_info=None, + name: str, + gain_lsb: str, + gain_mid: str, + gain_msb: str, + coupling: str, + speed_mode: str, + device_manager: DeviceManagerDS | None = None, + scan_info: ScanInfo | None = None, **kwargs, ): super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs) - # Get all related signals to inputs + + # First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class. self._gain_lsb = self.gain.get_device_object_from_bec( object_name=gain_lsb, signal_name=self.name, device_manager=device_manager ) @@ -71,22 +121,36 @@ class BPMControl(PSIPseudoDeviceBase): self._speed_mode = self.gain.get_device_object_from_bec( object_name=speed_mode, signal_name=self.name, device_manager=device_manager ) - # Set computed signals - self.gain.set_compute_method(self._compute_gain, msb=self._gain_msb, mid=self._gain_mid, lsb=self._gain_lsb, speed_mode=self._speed_mode) + + # Set the compute methods for the virtual signals. + self.gain.set_compute_method( + self._compute_gain, + msb=self._gain_msb, + mid=self._gain_mid, + lsb=self._gain_lsb, + speed_mode=self._speed_mode, + ) self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling) self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode) - def set_gain(self, gain:Literal[1e3,1e4,1e5,1e6,1e8,1e9,1e10,1e11]) -> None: + def set_gain( + self, + gain: Literal[ + 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000 + ], + ) -> None: """ - Set the gain of the amplifiert. + Set the gain of the amplifier. Args: - gain (Literal): Must be one of .... + gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000. """ gain_int = int(gain) if gain_int not in VALID_GAINS: - raise ValueError(f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}") - + raise ValueError( + f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}" + ) + msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int] self._gain_msb.set(bool(msb)).wait(timeout=2) @@ -94,22 +158,32 @@ class BPMControl(PSIPseudoDeviceBase): self._gain_mid.set(bool(mid)).wait(timeout=2) self._speed_mode.set(bool(use_low_noise)) - def set_coupling(self, coupling:Literal["AC", "DC"]) -> None: - if coupling not in ["AC", "DC"]: - raise ValueError(f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'") - self._coupling.set(coupling=="DC").wait(timeout=2) - + def set_coupling(self, coupling: Literal["AC", "DC"]) -> None: + """ + Set the coupling of the amplifier. - def _compute_gain(self, msb:Signal, mid:Signal, lsb:Signal, speed_mode:Signal) -> int: + Args: + coupling (Literal): Must be either "AC" or "DC". + """ + if coupling not in ["AC", "DC"]: + raise ValueError( + f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'" + ) + self._coupling.set(coupling == "DC").wait(timeout=2) + + def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int: + """Compute the gain based on the bits and speed mode.""" bits = (msb.get(), mid.get(), lsb.get()) speed_mode = speed_mode.get() if speed_mode: return _GAIN_BITS_LOW_NOISE.get(bits) else: return _GAIN_BITS_HIGH_SPEED.get(bits) - - def _compute_coupling(self, coupling:Signal) -> str: + + def _compute_coupling(self, coupling: Signal) -> str: + """Compute the coupling based on the signal.""" return "DC" if coupling.get() else "AC" - - def _compute_speed(self, speed:Signal) -> str: + + def _compute_speed(self, speed: Signal) -> str: + """Compute the speed based on the signal.""" return "low_speed" if speed.get() else "high_speed"