Add simulated devices for endstation, fsh and ddg1 #197

Merged
appel_c merged 3 commits from feat/add-simulated-bl-endstation-devices into main 2026-04-22 17:21:09 +02:00
10 changed files with 101 additions and 20 deletions
@@ -172,7 +172,7 @@ class LamNIOpticsMixin:
def leye_out(self):
self.loptics_in()
dev.omnyfsh.fshopen()
dev.fsh.fshopen()
leyey_out = self._get_user_param_safe("leyey", "out")
umv(dev.leyey, leyey_out)
@@ -16,7 +16,7 @@ class FlomniOpticsMixin:
return param.get(var)
def feye_out(self):
dev.omnyfsh.fshclose()
dev.fsh.fshclose()
self.foptics_in()
self.flomnigui_show_xeyealign()
self.xrayeye_update_frame()
@@ -57,14 +57,14 @@ class XrayEyeAlign:
self.gui.on_live_view_enabled(True)
dev.omnyfsh.fshopen()
dev.fsh.fshopen()
time.sleep(0.5)
# stop live view
if not keep_shutter_open:
self.gui.on_live_view_enabled(False)
time.sleep(0.1)
dev.omnyfsh.fshclose()
dev.fsh.fshclose()
print("Received new frame.")
else:
print("Staying in live view, shutter is and remains open!")
@@ -233,7 +233,7 @@ class XrayEyeAlign:
if keep_shutter_open:
if self.flomni.OMNYTools.yesno("Close the shutter now?", "y"):
dev.omnyfsh.fshclose()
dev.fsh.fshclose()
self.gui.on_live_view_enabled(False)
print("setting 'XOMNYI-XEYE-ACQ:0'")
@@ -48,7 +48,7 @@ class OMNYOpticsMixin:
dev.oeyez.controller.socket_put_confirmed("axspeed[7]=10000")
def oeye_out(self):
dev.omnyfsh.fshclose()
dev.fsh.fshclose()
if self.OMNYTools.yesno("Did you move in the optics?"):
umv(dev.oeyez, -2)
self._oeyey_mv(-60.3)
@@ -154,7 +154,7 @@ class XRayEye(BECWidget, QWidget):
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(
self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh")
self.getting_shutter_status, MessageEndpoints.device_readback("fsh")
)
self.bec_dispatcher.connect_slot(
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
@@ -342,7 +342,7 @@ class XRayEye(BECWidget, QWidget):
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
self.dev.omnyfsh.read()
self.dev.fsh.read()
################################################################################
# Device Connection logic
@@ -451,15 +451,15 @@ class XRayEye(BECWidget, QWidget):
logger.info(f"Shutter changed from GUI to: {enabled}")
self.shutter_toggle.blockSignals(True)
if enabled:
self.dev.omnyfsh.fshopen()
self.dev.fsh.fshopen()
else:
self.dev.omnyfsh.fshclose()
self.dev.fsh.fshclose()
# self.shutter_toggle.checked = enabled
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_shutter_status(self, data, meta):
shutter_open = bool(data.get("signals").get("omnyfsh_shutter").get("value"))
shutter_open = bool(data.get("signals").get("fsh_shutter").get("value"))
self.shutter_toggle.blockSignals(True)
self.shutter_toggle.checked = shutter_open
self.shutter_toggle.blockSignals(False)
@@ -567,7 +567,7 @@ class XRayEye(BECWidget, QWidget):
)
self.bec_dispatcher.disconnect_slot(
self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh")
self.getting_shutter_status, MessageEndpoints.device_readback("fsh")
)
self.bec_dispatcher.disconnect_slot(
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
+9 -8
View File
@@ -477,15 +477,16 @@ flomni_temphum:
readoutPriority: baseline
# ############################################################
# ########## OMNY / flOMNI / LamNI fast shutter ##############
# REPLACED BY SIMULATED BEAMLINE DEVICE !!!!!!!!!!!!!!!!
# ############################################################
omnyfsh:
description: omnyfsh connects to fast shutter at X12 if device fsh exists
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastShutter
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
# omnyfsh:
# description: omnyfsh connects to fast shutter at X12 if device fsh exists
# deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastShutter
# deviceConfig: {}
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
############################################################
#################### GUI Signals ###########################
############################################################
@@ -0,0 +1,21 @@
ddg1:
description: Simulated main delay Generator for triggering
deviceClass: csaxs_bec.devices.sim.simulated_beamline_devices.SimulatedDDG1
enabled: true
deviceConfig:
prefix: 'X12SA-CPCL-DDG1:'
onFailure: raise
readOnly: false
readoutPriority: baseline
softwareTrigger: true
fsh:
description: Simulated fast shutter manual control and readback
deviceClass: csaxs_bec.devices.sim.simulated_beamline_devices.cSAXSSimulatedFastShutter
deviceConfig:
prefix: 'X12SA-ES1-TTL:'
onFailure: raise
enabled: true
readoutPriority: monitored
flomni:
- !include ../ptycho_flomni.yaml
View File
@@ -0,0 +1,59 @@
"""This module contains simulated versions of beamline devices relevant for running simulated experiments."""
from __future__ import annotations
from bec_lib.logger import bec_logger
from ophyd import Device
from ophyd_devices import StatusBase
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import DDG1
from csaxs_bec.devices.epics.fast_shutter import cSAXSFastEpicsShutter
logger = bec_logger.logger
class cSAXSSimulatedFastShutter(Device):
"""
Simulated version of the cSAXS Fast Shutter.
To set up initial signal to specific values, please use the example below:
fsh.shutter._read_pv.mock_data = 1 #
-> with 'shutter' being a signal of the shutter device
"""
def __new__(cls, *args, **kwargs):
with patched_device(cSAXSFastEpicsShutter, *args, **kwargs) as fsh:
# fsh.shutter._read_pv.mock_data = 1 # Set mock data if needed
return fsh
class DDG1WithPatchedStage(DDG1):
"""
Simulated version of the DDG1 with patched stage method.
We inherit here to be able to overwrite the on_stage method if needed, without compromising the rest of the device's functionality.
"""
def on_stage(self):
logger.warning(
f"Staging of {self.name}. This is a simulated device of {self.__class__.__name__}."
)
super().on_stage()
def on_trigger(self):
"""
We overwrite and patch the on_trigger method. This resolves immediately now and
sets the status to finished, instead of waiting for the trigger logic to complete.
"""
# Change logic if needed
status = StatusBase(obj=self)
status.set_finished()
return status
class SimulatedDDG1(Device):
def __new__(cls, *args, **kwargs):
with patched_device(DDG1WithPatchedStage, *args, **kwargs) as ddg1:
return ddg1