Add simulated devices for endstation, fsh and ddg1 #197
@@ -172,7 +172,7 @@ class LamNIOpticsMixin:
|
||||
|
||||
def leye_out(self):
|
||||
self.loptics_in()
|
||||
dev.omnyfsh.fshopen()
|
||||
dev.fsh.fshopen()
|
||||
leyey_out = self._get_user_param_safe("leyey", "out")
|
||||
umv(dev.leyey, leyey_out)
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ class FlomniOpticsMixin:
|
||||
return param.get(var)
|
||||
|
||||
def feye_out(self):
|
||||
dev.omnyfsh.fshclose()
|
||||
dev.fsh.fshclose()
|
||||
self.foptics_in()
|
||||
self.flomnigui_show_xeyealign()
|
||||
self.xrayeye_update_frame()
|
||||
|
||||
@@ -57,14 +57,14 @@ class XrayEyeAlign:
|
||||
|
||||
self.gui.on_live_view_enabled(True)
|
||||
|
||||
dev.omnyfsh.fshopen()
|
||||
dev.fsh.fshopen()
|
||||
|
||||
time.sleep(0.5)
|
||||
# stop live view
|
||||
if not keep_shutter_open:
|
||||
self.gui.on_live_view_enabled(False)
|
||||
time.sleep(0.1)
|
||||
dev.omnyfsh.fshclose()
|
||||
dev.fsh.fshclose()
|
||||
print("Received new frame.")
|
||||
else:
|
||||
print("Staying in live view, shutter is and remains open!")
|
||||
@@ -233,7 +233,7 @@ class XrayEyeAlign:
|
||||
|
||||
if keep_shutter_open:
|
||||
if self.flomni.OMNYTools.yesno("Close the shutter now?", "y"):
|
||||
dev.omnyfsh.fshclose()
|
||||
dev.fsh.fshclose()
|
||||
self.gui.on_live_view_enabled(False)
|
||||
print("setting 'XOMNYI-XEYE-ACQ:0'")
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ class OMNYOpticsMixin:
|
||||
dev.oeyez.controller.socket_put_confirmed("axspeed[7]=10000")
|
||||
|
||||
def oeye_out(self):
|
||||
dev.omnyfsh.fshclose()
|
||||
dev.fsh.fshclose()
|
||||
if self.OMNYTools.yesno("Did you move in the optics?"):
|
||||
umv(dev.oeyez, -2)
|
||||
self._oeyey_mv(-60.3)
|
||||
|
||||
@@ -154,7 +154,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
|
||||
# Connection to redis endpoints
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh")
|
||||
self.getting_shutter_status, MessageEndpoints.device_readback("fsh")
|
||||
)
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
|
||||
@@ -342,7 +342,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
|
||||
def _init_gui_trigger(self):
|
||||
self.dev.omny_xray_gui.read()
|
||||
self.dev.omnyfsh.read()
|
||||
self.dev.fsh.read()
|
||||
|
||||
################################################################################
|
||||
# Device Connection logic
|
||||
@@ -451,15 +451,15 @@ class XRayEye(BECWidget, QWidget):
|
||||
logger.info(f"Shutter changed from GUI to: {enabled}")
|
||||
self.shutter_toggle.blockSignals(True)
|
||||
if enabled:
|
||||
self.dev.omnyfsh.fshopen()
|
||||
self.dev.fsh.fshopen()
|
||||
else:
|
||||
self.dev.omnyfsh.fshclose()
|
||||
self.dev.fsh.fshclose()
|
||||
# self.shutter_toggle.checked = enabled
|
||||
self.shutter_toggle.blockSignals(False)
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def getting_shutter_status(self, data, meta):
|
||||
shutter_open = bool(data.get("signals").get("omnyfsh_shutter").get("value"))
|
||||
shutter_open = bool(data.get("signals").get("fsh_shutter").get("value"))
|
||||
self.shutter_toggle.blockSignals(True)
|
||||
self.shutter_toggle.checked = shutter_open
|
||||
self.shutter_toggle.blockSignals(False)
|
||||
@@ -567,7 +567,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
)
|
||||
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh")
|
||||
self.getting_shutter_status, MessageEndpoints.device_readback("fsh")
|
||||
)
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
|
||||
|
||||
@@ -477,15 +477,16 @@ flomni_temphum:
|
||||
readoutPriority: baseline
|
||||
# ############################################################
|
||||
# ########## OMNY / flOMNI / LamNI fast shutter ##############
|
||||
# REPLACED BY SIMULATED BEAMLINE DEVICE !!!!!!!!!!!!!!!!
|
||||
# ############################################################
|
||||
omnyfsh:
|
||||
description: omnyfsh connects to fast shutter at X12 if device fsh exists
|
||||
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastShutter
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
# omnyfsh:
|
||||
# description: omnyfsh connects to fast shutter at X12 if device fsh exists
|
||||
# deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastShutter
|
||||
# deviceConfig: {}
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: baseline
|
||||
############################################################
|
||||
#################### GUI Signals ###########################
|
||||
############################################################
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
ddg1:
|
||||
description: Simulated main delay Generator for triggering
|
||||
deviceClass: csaxs_bec.devices.sim.simulated_beamline_devices.SimulatedDDG1
|
||||
enabled: true
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-CPCL-DDG1:'
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: true
|
||||
fsh:
|
||||
description: Simulated fast shutter manual control and readback
|
||||
deviceClass: csaxs_bec.devices.sim.simulated_beamline_devices.cSAXSSimulatedFastShutter
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-ES1-TTL:'
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
|
||||
flomni:
|
||||
- !include ../ptycho_flomni.yaml
|
||||
@@ -0,0 +1,59 @@
|
||||
"""This module contains simulated versions of beamline devices relevant for running simulated experiments."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Device
|
||||
from ophyd_devices import StatusBase
|
||||
from ophyd_devices.tests.utils import patched_device
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.ddg_1 import DDG1
|
||||
from csaxs_bec.devices.epics.fast_shutter import cSAXSFastEpicsShutter
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class cSAXSSimulatedFastShutter(Device):
|
||||
"""
|
||||
Simulated version of the cSAXS Fast Shutter.
|
||||
|
||||
To set up initial signal to specific values, please use the example below:
|
||||
fsh.shutter._read_pv.mock_data = 1 #
|
||||
-> with 'shutter' being a signal of the shutter device
|
||||
|
||||
"""
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
with patched_device(cSAXSFastEpicsShutter, *args, **kwargs) as fsh:
|
||||
# fsh.shutter._read_pv.mock_data = 1 # Set mock data if needed
|
||||
return fsh
|
||||
|
||||
|
||||
class DDG1WithPatchedStage(DDG1):
|
||||
"""
|
||||
Simulated version of the DDG1 with patched stage method.
|
||||
We inherit here to be able to overwrite the on_stage method if needed, without compromising the rest of the device's functionality.
|
||||
"""
|
||||
|
||||
def on_stage(self):
|
||||
logger.warning(
|
||||
f"Staging of {self.name}. This is a simulated device of {self.__class__.__name__}."
|
||||
)
|
||||
super().on_stage()
|
||||
|
||||
def on_trigger(self):
|
||||
"""
|
||||
We overwrite and patch the on_trigger method. This resolves immediately now and
|
||||
sets the status to finished, instead of waiting for the trigger logic to complete.
|
||||
"""
|
||||
# Change logic if needed
|
||||
status = StatusBase(obj=self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
|
||||
class SimulatedDDG1(Device):
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
with patched_device(DDG1WithPatchedStage, *args, **kwargs) as ddg1:
|
||||
return ddg1
|
||||
Reference in New Issue
Block a user