wip at the beamline
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m58s

This commit is contained in:
x12sa
2026-03-19 13:05:07 +01:00
parent 9eee4ee1f7
commit 3dcdbc5f63
5 changed files with 125 additions and 2 deletions

View File

@@ -0,0 +1,24 @@
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
bpm1:
readoutPriority: baseline
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: galilrioesxbox.analog_in.ch0
blade_r: galilrioesxbox.analog_in.ch1
blade_b: galilrioesxbox.analog_in.ch2
blade_l: galilrioesxbox.analog_in.ch3
enabled: true
readOnly: false
softwareTrigger: true
needs:
- galilrioesxbox

View File

@@ -48,7 +48,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshopen(self):
"""Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(1)
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
@@ -56,7 +55,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshclose(self):
"""Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(0)
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)

View File

@@ -0,0 +1,100 @@
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal, ProcessedSignalModel
from ophyd_devices import PSIDeviceBase
from ophyd import Kind, Signal
from ophyd import Component as Cpt
class BPM(PSIDeviceBase):
# Blade signals, a,b,c,d
top = Cpt(BECProcessedSignal, name="top", model=None,kind=Kind.config, doc="... top blade")
right = Cpt(BECProcessedSignal, name="right", model=None,kind=Kind.config, doc="... right blade")
bot = Cpt(BECProcessedSignal, name="bot", model=None,kind=Kind.config, doc="... bot blade")
left = Cpt(BECProcessedSignal, name="left", model=None,kind=Kind.config, doc="... left blade")
# Virtual signals
# pos_x = Cpt(BECProcessedSignal, name="pos_x", model=None,kind=Kind.config, doc="... pos_x")
# pos_y = Cpt(BECProcessedSignal, name="pos_y", model=None,kind=Kind.config, doc="... pos_y")
# diagonal = Cpt(BECProcessedSignal, name="diagonal", model=None,kind=Kind.config, doc="... diagonal")
intensity = Cpt(BECProcessedSignal, name="intensity", model=None,kind=Kind.config, doc="... intensity")
def __init__(self,
name,
blade_t: str,
blade_r: str,
blade_b: str,
blade_l: str,
device_manager=None,
**kwargs):
super().__init__(name=name, device_manager=device_manager, **kwargs)
self.top.set_model(devices= {"signal", blade_t}, compute_method=self._compute_blade_signal)
self.top.set_model(devices= {"signal", blade_r}, compute_method=self._compute_blade_signal)
self.top.set_model(devices= {"signal", blade_b}, compute_method=self._compute_blade_signal)
self.top.set_model(devices= {"signal", blade_l}, compute_method=self._compute_blade_signal)
self.intensity
def wait_for_connection(self, *args, **kwargs):
# Blade models...
model = ProcessedSignalModel(
devices = {"signal" : self._blade_names[0]},
compute_method=self._compute_blade_signal,
return_type=float,
)
self.top.set_model(model)
self.top.wait_for_connection()
model = ProcessedSignalModel(
devices = {"signal" : self._blade_names[1]},
compute_method=self._compute_blade_signal,
return_type=float,
)
self.right.set_model(model)
self.right.wait_for_connection()
model = ProcessedSignalModel(
devices = {"signal" : self._blade_names[2]},
compute_method=self._compute_blade_signal,
return_type=float,
)
self.bot.set_model(model)
self.bot.wait_for_connection()
model = ProcessedSignalModel(
devices = {"signal" : self._blade_names[3]},
compute_method=self._compute_blade_signal,
return_type=float,
)
self.left.set_model(model)
self.left.wait_for_connection()
# Intensity
model = ProcessedSignalModel(
devices = {
"top" : f"{self.name}_{self.top.name}",
"right" : f"{self.name}_{self.right.name}",
"bot" : f"{self.name}_{self.bot.name}",
"left" : f"{self.name}_{self.left.name}",
},
compute_method=self._compute_intensity,
return_type=float
)
self.intensity.set_model(model)
dev_objs = {
"top" : self.top,
"right" : self.right,
"bot" : self.bot,
"left" : self.left,
}
self.intensity.set_device_object(device_objects=dev_objs)
self.intensity.wait_for_connection()
return super().wait_for_connection(*args, **kwargs)
def compute(self, signal_1: Signal, signal_2: Signal) -> float:
"""Compute the value of the pseudo signal based on the values of signal_1 and signal_2."""
return float(signal_1.get() + signal_2.get())
def _compute_blade_signal(self, signal:Signal) -> float:
return signal.get()
def _compute_intensity(self, top, right, bot, left) -> float:
intensity = top.get() + right.get() + bot.get() + left.get()
return intensity

View File

@@ -0,0 +1 @@
# from ophyd