fix(pseudo_devices): fix pseudo devices, bpm and bpm_control

This commit is contained in:
2026-03-27 20:00:17 +01:00
parent fecd4b84a4
commit 9557d98f30
2 changed files with 206 additions and 101 deletions

View File

@@ -1,141 +1,172 @@
import time
from typing import TYPE_CHECKING
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
if TYPE_CHECKING: #pragma-nocover....
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO
class BPM(PSIPseudoDeviceBase):
"""BPM positioner pseudo device."""
# Blade signals, a,b,c,d
top = Cpt(
BECProcessedSignal, name="top", model_config=None, kind=Kind.config, doc="... top blade"
left_top = Cpt(
BECProcessedSignal,
name="left_top",
model_config=None,
kind=Kind.config,
doc="BPM left_top blade",
)
right = Cpt(
BECProcessedSignal, name="right", model_config=None, kind=Kind.config, doc="... right blade"
right_top = Cpt(
BECProcessedSignal,
name="right_top",
model_config=None,
kind=Kind.config,
doc="BPM right_top blade",
)
bot = Cpt(
BECProcessedSignal, name="bot", model_config=None, kind=Kind.config, doc="... bot blade"
right_bot = Cpt(
BECProcessedSignal,
name="right_bot",
model_config=None,
kind=Kind.config,
doc="BPM right_bottom blade",
)
left = Cpt(
BECProcessedSignal, name="left", model_config=None, kind=Kind.config, doc="... left blade"
left_bot = Cpt(
BECProcessedSignal,
name="left_bot",
model_config=None,
kind=Kind.config,
doc="BPM left_bot blade",
)
# Virtual signals
pos_x = Cpt(
BECProcessedSignal, name="pos_x", model_config=None, kind=Kind.config, doc="... pos_x"
BECProcessedSignal,
name="pos_x",
model_config=None,
kind=Kind.config,
doc="BPM X position, -1 fully left, 1 fully right",
)
pos_y = Cpt(
BECProcessedSignal, name="pos_y", model_config=None, kind=Kind.config, doc="... pos_y"
BECProcessedSignal,
name="pos_y",
model_config=None,
kind=Kind.config,
doc="BPM Y position, -1 fully bottom, 1 fully top",
)
diagonal = Cpt(
BECProcessedSignal, name="diagonal", model_config=None, kind=Kind.config, doc="... diagonal"
BECProcessedSignal,
name="diagonal",
model_config=None,
kind=Kind.config,
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
)
intensity = Cpt(
BECProcessedSignal,
name="intensity",
model_config=None,
kind=Kind.config,
doc="... intensity",
doc="BPM intensity",
)
def __init__(
self,
name,
blade_t: str,
blade_r: str,
blade_b: str,
blade_l: str,
left_top: str,
right_top: str,
right_bot: str,
left_bot: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all blade signal objects from utility method
signal_t = self.top.get_device_object_from_bec(
object_name=blade_t, signal_name=self.name, device_manager=device_manager
signal_t = self.left_top.get_device_object_from_bec(
object_name=left_top, signal_name=self.name, device_manager=device_manager
)
signal_r = self.right.get_device_object_from_bec(
object_name=blade_r, signal_name=self.name, device_manager=device_manager
signal_r = self.right_top.get_device_object_from_bec(
object_name=right_top, signal_name=self.name, device_manager=device_manager
)
signal_b = self.bot.get_device_object_from_bec(
object_name=blade_b, signal_name=self.name, device_manager=device_manager
signal_b = self.right_bot.get_device_object_from_bec(
object_name=right_bot, signal_name=self.name, device_manager=device_manager
)
signal_l = self.left.get_device_object_from_bec(
object_name=blade_l, signal_name=self.name, device_manager=device_manager
signal_l = self.left_bot.get_device_object_from_bec(
object_name=left_bot, signal_name=self.name, device_manager=device_manager
)
# Set compute methods for blade signals and virtual signals
self.top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.intensity.set_compute_method(
self._compute_intensity, top=self.top, right=self.right, bot=self.bot, left=self.left
self._compute_intensity,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
left_bot=self.left_bot,
)
self.pos_x.set_compute_method(
self._compute_pos_x, left=self.left, top=self.top, right=self.right, bot=self.bot
self._compute_pos_x,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.pos_y.set_compute_method(
self._compute_pos_y, left=self.left, top=self.top, right=self.right, bot=self.bot
self._compute_pos_y,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.diagonal.set_compute_method(
self._compute_diagonal, left=self.left, top=self.top, right=self.right, bot=self.bot
self._compute_diagonal,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
def _compute_blade_signal(self, signal: Signal) -> float:
return signal.get()
def _compute_intensity(self, top: Signal, right: Signal, bot: Signal, left: Signal) -> float:
intensity = top.get() + right.get() + bot.get() + left.get()
def _compute_intensity(
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
) -> float:
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
return intensity
def _compute_pos_x(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
sum_left = left.get() + top.get()
sum_right = right.get() + bot.get()
def _compute_pos_x(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
sum_left = left_bot.get() + left_top.get()
sum_right = right_top.get() + right_bot.get()
sum_total = sum_left + sum_right
if sum_total == 0:
return 0.0
return (sum_left - sum_right) / sum_total
return (sum_right - sum_left) / sum_total
def _compute_pos_y(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
sum_top = top.get() + right.get()
sum_bot = bot.get() + left.get()
def _compute_pos_y(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
sum_top = left_top.get() + right_top.get()
sum_bot = right_bot.get() + left_bot.get()
sum_total = sum_top + sum_bot
if sum_total == 0:
return 0.0
return (sum_top - sum_bot) / sum_total
def _compute_diagonal(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
sum_diag1 = left.get() + right.get()
sum_diag2 = top.get() + bot.get()
def _compute_diagonal(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
sum_diag1 = left_bot.get() + right_top.get()
sum_diag2 = left_top.get() + right_bot.get()
sum_total = sum_diag1 + sum_diag2
if sum_total == 0:
return 0.0
return (sum_diag1 - sum_diag2) / sum_total
class BPMRio(BPM):
def __init__(
self,
name,
blade_t: str,
blade_r: str,
blade_b: str,
blade_l: str,
rio:str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, blade_t=blade_t, blade_r=blade_r, blade_b=blade_b, blade_l=blade_l, device_manager=device_manager, scan_info=scan_info, **kwargs)
rio:GalilRIO = self.top.get_device_object_from_bec(object_name=rio, signal_name=name, device_manager=device_manager)

View File

@@ -1,7 +1,20 @@
"""
Module for controlling the BPM amplifier settings, such as gain and coupling.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ophyd import Component as Cpt
from ophyd import Kind
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
from ophyd import Component as Cpt, Kind, Signal
from typing import Literal
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import Signal
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
@@ -33,29 +46,66 @@ for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class BPMControl(PSIPseudoDeviceBase):
"""
BPM amplifier control pseudo device. It is responsible for controlling the
gain and coupling for the BPM amplifier. It relies on signals from a device
in BEC to be available. For cSAXS, these are most liikely to be from the
GalilRIO device that controls the BPM amplifier.
Args:
name (str): Name of the pseudo device.
gain_lsb (str): Name of the signal in BEC that controls the LSB
of the gain setting.
gain_mid (str): Name of the signal in BEC that controls the MID
bit of the gain setting.
gain_msb (str): Name of the signal in BEC that controls the MSB
of the gain setting.
coupling (str): Name of the signal in BEC that controls the coupling
setting.
speed_mode (str): Name of the signal in BEC that controls the speed mode
(low-noise vs high-speed) of the amplifier.
"""
USER_ACCESS = ["set_gain", "set_coupling"]
gain = Cpt(BECProcessedSignal, name="gain", model_config=None, kind=Kind.config, doc="Gain of the amplifier")
coupling = Cpt(BECProcessedSignal, name="coupling", model_config=None, kind=Kind.config, doc="Coupling of the amplifier")
speed = Cpt(BECProcessedSignal, name="speed", model_config=None, kind=Kind.config, doc="Speed of the amplifier")
gain = Cpt(
BECProcessedSignal,
name="gain",
model_config=None,
kind=Kind.config,
doc="Gain of the amplifier",
)
coupling = Cpt(
BECProcessedSignal,
name="coupling",
model_config=None,
kind=Kind.config,
doc="Coupling of the amplifier",
)
speed = Cpt(
BECProcessedSignal,
name="speed",
model_config=None,
kind=Kind.config,
doc="Speed of the amplifier",
)
def __init__(
self,
name,
gain_lsb,
gain_mid,
gain_msb,
coupling,
speed_mode,
device_manager=None,
scan_info=None,
name: str,
gain_lsb: str,
gain_mid: str,
gain_msb: str,
coupling: str,
speed_mode: str,
device_manager: DeviceManagerDS | None = None,
scan_info: ScanInfo | None = None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all related signals to inputs
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
self._gain_lsb = self.gain.get_device_object_from_bec(
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
)
@@ -71,22 +121,36 @@ class BPMControl(PSIPseudoDeviceBase):
self._speed_mode = self.gain.get_device_object_from_bec(
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
)
# Set computed signals
self.gain.set_compute_method(self._compute_gain, msb=self._gain_msb, mid=self._gain_mid, lsb=self._gain_lsb, speed_mode=self._speed_mode)
# Set the compute methods for the virtual signals.
self.gain.set_compute_method(
self._compute_gain,
msb=self._gain_msb,
mid=self._gain_mid,
lsb=self._gain_lsb,
speed_mode=self._speed_mode,
)
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
def set_gain(self, gain:Literal[1e3,1e4,1e5,1e6,1e8,1e9,1e10,1e11]) -> None:
def set_gain(
self,
gain: Literal[
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
],
) -> None:
"""
Set the gain of the amplifiert.
Set the gain of the amplifier.
Args:
gain (Literal): Must be one of ....
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
"""
gain_int = int(gain)
if gain_int not in VALID_GAINS:
raise ValueError(f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}")
raise ValueError(
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
)
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
self._gain_msb.set(bool(msb)).wait(timeout=2)
@@ -94,22 +158,32 @@ class BPMControl(PSIPseudoDeviceBase):
self._gain_mid.set(bool(mid)).wait(timeout=2)
self._speed_mode.set(bool(use_low_noise))
def set_coupling(self, coupling:Literal["AC", "DC"]) -> None:
if coupling not in ["AC", "DC"]:
raise ValueError(f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'")
self._coupling.set(coupling=="DC").wait(timeout=2)
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
"""
Set the coupling of the amplifier.
def _compute_gain(self, msb:Signal, mid:Signal, lsb:Signal, speed_mode:Signal) -> int:
Args:
coupling (Literal): Must be either "AC" or "DC".
"""
if coupling not in ["AC", "DC"]:
raise ValueError(
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
)
self._coupling.set(coupling == "DC").wait(timeout=2)
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
"""Compute the gain based on the bits and speed mode."""
bits = (msb.get(), mid.get(), lsb.get())
speed_mode = speed_mode.get()
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
def _compute_coupling(self, coupling:Signal) -> str:
def _compute_coupling(self, coupling: Signal) -> str:
"""Compute the coupling based on the signal."""
return "DC" if coupling.get() else "AC"
def _compute_speed(self, speed:Signal) -> str:
def _compute_speed(self, speed: Signal) -> str:
"""Compute the speed based on the signal."""
return "low_speed" if speed.get() else "high_speed"