feat: Add BPM and BPMControl pseudo devices

This commit is contained in:
x12sa
2026-03-19 13:05:07 +01:00
committed by appel_c
parent b67e1c012c
commit fecd4b84a4
10 changed files with 411 additions and 16 deletions

View File

@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3

View File

@@ -544,6 +544,66 @@ sl5trxt:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 5
sl5ch:
description: ESbox1 slit 5 center horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5wh:
description: ESbox1 slit 5 width horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5cv:
description: ESbox1 slit 5 center vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
sl5wv:
description: ESbox1 slit 5 width vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
xbimtrx:
description: ESbox2 beam intensity monitor x movement
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
@@ -822,4 +882,37 @@ dettrx:
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
softwareTrigger: false
####################
### Beamstop control for flight tube
####################
beamstop_control:
description: Gain control for beamstop flightube
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
deviceConfig:
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
galilrioesft:
description: Galil RIO for remote gain switching and slow reading FlightTube
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -199,6 +199,25 @@ xbpm1c4:
readOnly: true
softwareTrigger: false
bpm1:
description: 'XBPM1 (frontend)'
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: xbpm1c1
blade_r: xbpm1c2
blade_b: xbpm1c3
blade_l: xbpm1c4
onFailure: raise
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
needs:
- xbpm1c1
- xbpm1c2
- xbpm1c3
- xbpm1c4
############################################
######### End of xbpm sub devices ##########
############################################

View File

@@ -68,18 +68,22 @@ ccmx:
- cSAXS
- optics
# ccm_energy:
# readoutPriority: baseline
# deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase
# prefix: "X12SA-OP-CCM1:"
# override_suffixes:
# user_readback: "ENERGY-GET"
# user_setpoint: "ENERGY-SET"
# velocity: "ROTY:VELO"
# deviceTags:
# - user motors
# enabled: true
# readOnly: false
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
ccm_energy:
description: 'test'
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
deviceConfig:
prefix: 'X12SA-OP-CCM1:'
override_suffixes:
user_readback: "ENERGY-GET"
user_setpoint: "ENERGY-SET"
velocity: "ROTY.VELO"
motor_done_move: "ROTY.DMOV"
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false

View File

@@ -0,0 +1,24 @@
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
bpm1:
readoutPriority: baseline
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: galilrioesxbox.analog_in.ch0
blade_r: galilrioesxbox.analog_in.ch1
blade_b: galilrioesxbox.analog_in.ch2
blade_l: galilrioesxbox.analog_in.ch3
enabled: true
readOnly: false
softwareTrigger: true
needs:
- galilrioesxbox

View File

@@ -48,7 +48,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshopen(self):
"""Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(1)
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
@@ -56,7 +55,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshclose(self):
"""Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(0)
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)

View File

@@ -0,0 +1,141 @@
import time
from typing import TYPE_CHECKING
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
if TYPE_CHECKING: #pragma-nocover....
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO
class BPM(PSIPseudoDeviceBase):
"""BPM positioner pseudo device."""
# Blade signals, a,b,c,d
top = Cpt(
BECProcessedSignal, name="top", model_config=None, kind=Kind.config, doc="... top blade"
)
right = Cpt(
BECProcessedSignal, name="right", model_config=None, kind=Kind.config, doc="... right blade"
)
bot = Cpt(
BECProcessedSignal, name="bot", model_config=None, kind=Kind.config, doc="... bot blade"
)
left = Cpt(
BECProcessedSignal, name="left", model_config=None, kind=Kind.config, doc="... left blade"
)
# Virtual signals
pos_x = Cpt(
BECProcessedSignal, name="pos_x", model_config=None, kind=Kind.config, doc="... pos_x"
)
pos_y = Cpt(
BECProcessedSignal, name="pos_y", model_config=None, kind=Kind.config, doc="... pos_y"
)
diagonal = Cpt(
BECProcessedSignal, name="diagonal", model_config=None, kind=Kind.config, doc="... diagonal"
)
intensity = Cpt(
BECProcessedSignal,
name="intensity",
model_config=None,
kind=Kind.config,
doc="... intensity",
)
def __init__(
self,
name,
blade_t: str,
blade_r: str,
blade_b: str,
blade_l: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all blade signal objects from utility method
signal_t = self.top.get_device_object_from_bec(
object_name=blade_t, signal_name=self.name, device_manager=device_manager
)
signal_r = self.right.get_device_object_from_bec(
object_name=blade_r, signal_name=self.name, device_manager=device_manager
)
signal_b = self.bot.get_device_object_from_bec(
object_name=blade_b, signal_name=self.name, device_manager=device_manager
)
signal_l = self.left.get_device_object_from_bec(
object_name=blade_l, signal_name=self.name, device_manager=device_manager
)
# Set compute methods for blade signals and virtual signals
self.top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.intensity.set_compute_method(
self._compute_intensity, top=self.top, right=self.right, bot=self.bot, left=self.left
)
self.pos_x.set_compute_method(
self._compute_pos_x, left=self.left, top=self.top, right=self.right, bot=self.bot
)
self.pos_y.set_compute_method(
self._compute_pos_y, left=self.left, top=self.top, right=self.right, bot=self.bot
)
self.diagonal.set_compute_method(
self._compute_diagonal, left=self.left, top=self.top, right=self.right, bot=self.bot
)
def _compute_blade_signal(self, signal: Signal) -> float:
return signal.get()
def _compute_intensity(self, top: Signal, right: Signal, bot: Signal, left: Signal) -> float:
intensity = top.get() + right.get() + bot.get() + left.get()
return intensity
def _compute_pos_x(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
sum_left = left.get() + top.get()
sum_right = right.get() + bot.get()
sum_total = sum_left + sum_right
if sum_total == 0:
return 0.0
return (sum_left - sum_right) / sum_total
def _compute_pos_y(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
sum_top = top.get() + right.get()
sum_bot = bot.get() + left.get()
sum_total = sum_top + sum_bot
if sum_total == 0:
return 0.0
return (sum_top - sum_bot) / sum_total
def _compute_diagonal(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
sum_diag1 = left.get() + right.get()
sum_diag2 = top.get() + bot.get()
sum_total = sum_diag1 + sum_diag2
if sum_total == 0:
return 0.0
return (sum_diag1 - sum_diag2) / sum_total
class BPMRio(BPM):
def __init__(
self,
name,
blade_t: str,
blade_r: str,
blade_b: str,
blade_l: str,
rio:str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, blade_t=blade_t, blade_r=blade_r, blade_b=blade_b, blade_l=blade_l, device_manager=device_manager, scan_info=scan_info, **kwargs)
rio:GalilRIO = self.top.get_device_object_from_bec(object_name=rio, signal_name=name, device_manager=device_manager)

View File

@@ -0,0 +1,115 @@
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
from ophyd import Component as Cpt, Kind, Signal
from typing import Literal
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class BPMControl(PSIPseudoDeviceBase):
USER_ACCESS = ["set_gain", "set_coupling"]
gain = Cpt(BECProcessedSignal, name="gain", model_config=None, kind=Kind.config, doc="Gain of the amplifier")
coupling = Cpt(BECProcessedSignal, name="coupling", model_config=None, kind=Kind.config, doc="Coupling of the amplifier")
speed = Cpt(BECProcessedSignal, name="speed", model_config=None, kind=Kind.config, doc="Speed of the amplifier")
def __init__(
self,
name,
gain_lsb,
gain_mid,
gain_msb,
coupling,
speed_mode,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all related signals to inputs
self._gain_lsb = self.gain.get_device_object_from_bec(
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
)
self._gain_mid = self.gain.get_device_object_from_bec(
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
)
self._gain_msb = self.gain.get_device_object_from_bec(
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
)
self._coupling = self.gain.get_device_object_from_bec(
object_name=coupling, signal_name=self.name, device_manager=device_manager
)
self._speed_mode = self.gain.get_device_object_from_bec(
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
)
# Set computed signals
self.gain.set_compute_method(self._compute_gain, msb=self._gain_msb, mid=self._gain_mid, lsb=self._gain_lsb, speed_mode=self._speed_mode)
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
def set_gain(self, gain:Literal[1e3,1e4,1e5,1e6,1e8,1e9,1e10,1e11]) -> None:
"""
Set the gain of the amplifiert.
Args:
gain (Literal): Must be one of ....
"""
gain_int = int(gain)
if gain_int not in VALID_GAINS:
raise ValueError(f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}")
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
self._gain_msb.set(bool(msb)).wait(timeout=2)
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
self._gain_mid.set(bool(mid)).wait(timeout=2)
self._speed_mode.set(bool(use_low_noise))
def set_coupling(self, coupling:Literal["AC", "DC"]) -> None:
if coupling not in ["AC", "DC"]:
raise ValueError(f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'")
self._coupling.set(coupling=="DC").wait(timeout=2)
def _compute_gain(self, msb:Signal, mid:Signal, lsb:Signal, speed_mode:Signal) -> int:
bits = (msb.get(), mid.get(), lsb.get())
speed_mode = speed_mode.get()
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
def _compute_coupling(self, coupling:Signal) -> str:
return "DC" if coupling.get() else "AC"
def _compute_speed(self, speed:Signal) -> str:
return "low_speed" if speed.get() else "high_speed"

View File

@@ -0,0 +1 @@
# from ophyd