Compare commits

..

11 Commits

Author SHA1 Message Date
x12sa
60286c8248 wip adjusting to V3
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m25s
CI for csaxs_bec / test (pull_request) Failing after 1m27s
2026-01-29 11:33:57 +01:00
x01dc
5a4b0fbb4c set fixed on the script
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m25s
CI for csaxs_bec / test (push) Failing after 1m27s
2026-01-29 10:00:47 +01:00
x01dc
2dc9985a85 colorbar changed to greys 2026-01-29 10:00:47 +01:00
f5180d5cab wip unify the live mode on cameras 2026-01-29 10:00:47 +01:00
x01dc
d5c199cbd7 feat(xray_eye): alignment gui and script adapted to not use epics gui device 2026-01-29 10:00:47 +01:00
d9512e6e22 wip ids camera subscription callback - run false also put directly from init kwarg 2026-01-29 10:00:47 +01:00
7c346b7cd1 wip ids camera subscription callback - run false 2026-01-29 10:00:47 +01:00
369c6c405e wip ids camera subscription callback 2026-01-29 10:00:47 +01:00
e2e30a2803 wip ids camera live mode signal 2026-01-29 10:00:47 +01:00
8a8471cc16 fix(bec_widgets): removed omny alignment old gui 2026-01-29 10:00:47 +01:00
5990ae54c4 feat(allied-vision-camera): Add allied vision camera integration 2026-01-29 10:00:47 +01:00
46 changed files with 899 additions and 2322 deletions

View File

@@ -1004,6 +1004,20 @@ class FlomniAlignmentMixin:
with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file:
tomo_alignment_fit[1][4] = file.readline()
tomo_alignment_fit[0][0] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['amplitude']
tomo_alignment_fit[0][1] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['frequency']
tomo_alignment_fit[0][2] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['shift']
tomo_alignment_fit[1][0] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['amplitude']
tomo_alignment_fit[1][1] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['frequency']
tomo_alignment_fit[1][2] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['shift']
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 0
tomo_alignment_fit[1][4] = 0
print("New alignment parameters loaded:")
print(
f"X Amplitude {tomo_alignment_fit[0][0]}, "

View File

@@ -1,6 +1,6 @@
import builtins
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client import AdvancedDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
@@ -26,10 +26,11 @@ class flomniGuiTools:
self.gui = self.client.gui
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.gui.flomni.show()
else:
self.gui.new("flomni")
self.gui.new("flomni")
# if "flomni" in self.gui.windows:
# self.gui.flomni.show()
# else:
# self.gui.new("flomni")
def flomnigui_stop_gui(self):
self.gui.flomni.hide()
@@ -37,24 +38,23 @@ class flomniGuiTools:
def flomnigui_raise(self):
self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
self.xeyegui = self.gui.flomni.new("XRayEye",object_name='xrayeye')
# start live
if not dev.cam_xeye.live_mode_enabled.get():
# dev.cam_xeye.live_mode = True
dev.cam_xeye.live_mode_enabled.put(True)
self.xeyegui.switch_tab('alignment')
def flomnigui_show_xeyealign_fittab(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("XRayEye")
self.xeyegui.switch_tab('fit')
def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"flomni"):
@@ -67,7 +67,7 @@ class flomniGuiTools:
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
camera_gripper_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True
@@ -78,7 +78,7 @@ class flomniGuiTools:
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
camera_overview_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True
@@ -102,7 +102,7 @@ class flomniGuiTools:
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
idle_text_box = self.gui.flomni.new("TextBox")
text = (
"<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
@@ -162,7 +162,7 @@ class flomniGuiTools:
self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget")
# --- Load PDF ---------------------------------------------------------
self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
self.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n")
@@ -179,7 +179,7 @@ class flomniGuiTools:
if self._flomnigui_check_attribute_not_exists("progressbar"):
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
self.progressbar = self.gui.flomni.new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
@@ -194,7 +194,7 @@ class flomniGuiTools:
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self.text_box = self.gui.flomni.new("TextBox")
self._flomnigui_update_progress()

View File

@@ -5,9 +5,11 @@ import os
import time
from typing import TYPE_CHECKING
import numpy as np
from bec_lib import bec_logger
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger
# import builtins to avoid linter errors
@@ -22,7 +24,7 @@ if TYPE_CHECKING:
class XrayEyeAlign:
# pixel calibration, multiply to get mm
labview=False
test_wo_movements = True
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None:
@@ -34,209 +36,194 @@ class XrayEyeAlign:
self.flomni.reset_correction()
self.flomni.reset_tomo_alignment_fit()
@property
def gui(self):
return self.flomni.xeyegui
def _reset_init_values(self):
self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0]
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self, keep_shutter_open=False):
def update_frame(self,keep_shutter_open=False):
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
if not self.labview:
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
# self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode_enabled.get():
dev.cam_xeye.live_mode_enabled.put(True)
epics_put("XOMNYI-XEYE-ACQ:0", 1)
if self.labview:
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
self.gui.on_live_view_enabled(True)
fshopen()
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
dev.omnyfsh.fshopen()
time.sleep(0.5)
# stop live view
if not keep_shutter_open:
epics_put("XOMNYI-XEYE-ACQ:0", 0)
self.gui.on_live_view_enabled(False)
time.sleep(0.1)
fshclose()
print("got new frame")
dev.omnyfsh.fshclose()
print("Received new frame.")
else:
print("Staying in live view, shutter is and remains open!")
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
umv(self.device_manager.devices.fsamroy, val)
if not self.test_wo_movements:
umv(self.device_manager.devices.fsamroy, val)
def get_tomo_angle(self):
return self.device_manager.devices.fsamroy.readback.get()
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
self._xray_fov_xy[0] = max(
getattr(dev.omny_xray_gui, f"width_x_{k}").get(), self._xray_fov_xy[0]
)
self._xray_fov_xy[1] = max(
getattr(dev.omny_xray_gui, f"width_y_{k}").get(), self._xray_fov_xy[1]
)
@property
def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def movement_buttons_enabled(self, enablex: bool, enabley: bool):
self.gui.on_motors_enable(enablex, enabley)
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
print(f"In alginment GUI: {msg}")
self.gui.user_message = msg
def align(self,keep_shutter_open=False):
def align(self, keep_shutter_open=False):
self.flomni.flomnigui_show_xeyealign()
if not keep_shutter_open:
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
print(
"This routine can be called with paramter keep_shutter_open=True to keep the shutter always open"
)
self.send_message("Getting things ready. Please wait...")
#potential unresolved movement requests to zero
epics_put("XOMNYI-XEYE-MVX:0", 0)
epics_put("XOMNYI-XEYE-MVY:0", 0)
self.gui.enable_submit_button(False)
# Initialize xray align device
# clear potential pending movement requests
dev.omny_xray_gui.mvx.set(0)
dev.omny_xray_gui.mvy.set(0)
# reset submit channel
dev.omny_xray_gui.submit.set(0)
self.movement_buttons_enabled(False, False)
# reset shift xy and fov params
self._reset_init_values()
self.flomni.lights_off()
self.flomni.flomnigui_show_xeyealign()
self.flomni.flomnigui_raise()
# self.flomni.flomnigui_show_xeyealign()
# self.flomni.flomnigui_raise()
self.tomo_rotate(0)
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
if not self.test_wo_movements:
self.tomo_rotate(0)
self.flomni.feye_in()
self.flomni.feye_in()
self.flomni.laser_tracker_on()
self.flomni.feedback_enable_with_reset()
# disable movement buttons
self.movement_buttons_enabled = False
self.movement_buttons_enabled(False, False)
sample_name = self.flomni.sample_get_name(0)
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", sample_name)
self.gui.sample_name = sample_name
# this makes sure we are in a defined state
self.flomni.feedback_disable()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
if not self.test_wo_movements:
self.flomni.fosa_out()
self.flomni.fosa_out()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in()
self.flomni.ffzp_in()
self.update_frame(keep_shutter_open)
# enable submit buttons
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.gui.enable_submit_button(True)
dev.omny_xray_gui.step.set(0).wait()
self.send_message("Submit center value of FZP.")
k = 0
while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") / 2 * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = val_x
if dev.omny_xray_gui.submit.get() == 1:
self.alignment_values[k] = (
getattr(dev.omny_xray_gui, f"xval_x_{k}").get() / 2 * self.PIXEL_CALIBRATION
) # in mm
print(f"Clicked position {k}: x {self.alignment_values[k]}")
rtx_position = dev.rtx.readback.get() / 1000
print(f"Current rtx position {rtx_position}")
self.alignment_values[k] -= rtx_position
print(f"Corrected position {k}: x {self.alignment_values[k]}")
# reset submit channel
dev.omny_xray_gui.submit.set(0)
if k == 0: # received center value of FZP
self.send_message("please wait ...")
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled(False, False)
self.gui.enable_submit_button(False)
self.flomni.feedback_disable()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in)
if not self.test_wo_movements:
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in)
self.flomni.foptics_out()
self.flomni.foptics_out()
self.flomni.feedback_disable()
umv(dev.fsamx, fsamx_in - 0.25)
time.sleep(0.5)
if self.labview:
self.update_frame(keep_shutter_open)
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in)
time.sleep(0.5)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
self.send_message("Adjust sample height and submit center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
self.send_message("Step 1/5: Adjust sample height and submit center")
self.gui.enable_submit_button(True)
self.movement_buttons_enabled(True, True)
elif 1 <= k < 5: # received sample center value at samroy 0 ... 315
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
self.gui.enable_submit_button(False)
self.movement_buttons_enabled(False, False)
umv(dev.rtx, 0)
self.tomo_rotate(k * 45)
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
dev.omny_xray_gui.angle.set(self.get_tomo_angle())
self.update_frame(keep_shutter_open)
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.send_message(f"Step {k+1}/5: Submit sample center")
self.gui.enable_submit_button(True)
self.movement_buttons_enabled(True, False)
self.update_fov(k)
elif k == 5: # received sample center value at samroy 270 and done
self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled = False
self.gui.enable_submit_button(False)
self.movement_buttons_enabled(False, False)
self.update_fov(k)
break
k += 1
epics_put("XOMNYI-XEYE-STEP:0", k)
dev.omny_xray_gui.step.set(k)
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
_xrayeyalignmvx = dev.omny_xray_gui.mvx.get()
if _xrayeyalignmvx != 0:
umvr(dev.rtx, _xrayeyalignmvx)
print(f"Current rtx position {dev.rtx.readback.get() / 1000}")
epics_put("XOMNYI-XEYE-MVX:0", 0)
if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3)
dev.omny_xray_gui.mvx.set(0)
self.update_frame(keep_shutter_open)
if k < 2:
# allow movements, store movements to calculate center
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
_xrayeyalignmvy = dev.omny_xray_gui.mvy.get()
if _xrayeyalignmvy != 0:
self.flomni.feedback_disable()
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
if not self.test_wo_movements:
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
time.sleep(2)
epics_put("XOMNYI-XEYE-MVY:0", 0)
dev.omny_xray_gui.mvy.set(0)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
time.sleep(0.2)
time.sleep(0.1)
self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
@@ -246,22 +233,17 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
# free camera
if self.labview:
epics_put("XOMNYI-XEYE-ACQ:0", 2)
if keep_shutter_open and not self.labview:
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
if keep_shutter_open:
if self.flomni.OMNYTools.yesno("Close the shutter now?", "y"):
dev.omnyfsh.fshclose()
self.gui.on_live_view_enabled(False)
print("setting 'XOMNYI-XEYE-ACQ:0'")
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"
f" = {fovy:.0f} microns"
)
print("Use the matlab routine to FIT the current alignment...")
print("Check the fit in the GUI...")
print("Then LOAD ALIGNMENT PARAMETERS by running flomni.read_alignment_offset()\n")
@@ -269,9 +251,33 @@ class XrayEyeAlign:
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")
if not os.path.exists(file):
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, "w") as alignment_values_file:
alignment_values_file.write("angle\thorizontal\n")
# Initialize an empty list to store fovx values
fovx_list = []
fovx_offsets = np.zeros(5) # holds offsets for k = 1..5
for k in range(1, 6):
fovx_offset = self.alignment_values[0] - self.alignment_values[k]
fovx_offsets[k - 1] = fovx_offset # store in array
fovx_x = (k - 1) * 45
fovx_list.append([fovx_x, fovx_offset * 1000]) # Append the data to the list
print(f"Writing to file new alignment: number {k}, value x {fovx_offset}")
alignment_values_file.write(f"{(k-1)*45}\t{fovx_offset*1000}\n")
alignment_values_file.write(f"{fovx_x}\t{fovx_offset * 1000}\n")
# Now build final numpy array:
data = np.array(
[
[0, 45, 90, 135, 180], # angles
fovx_offsets * 1000, # fovx_offset values
[0, 0, 0, 0, 0],
]
)
self.gui.submit_fit_array(data)
print(f"fit submited with {data}")
print("todo mirko: submitted data is 1000 fold in amplitude")
# self.flomni.flomnigui_show_xeyealign_fittab()

View File

@@ -1,6 +1,6 @@
import builtins
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client import AdvancedDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose

View File

@@ -83,20 +83,6 @@ class XRayEye(RPCBase):
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):
@@ -111,6 +97,30 @@ class XRayEye(RPCBase):
None
"""
@rpc_call
def on_live_view_enabled(self, enabled: "bool"):
"""
None
"""
@rpc_call
def on_motors_enable(self, x_enable: "bool", y_enable: "bool"):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
@rpc_call
def enable_submit_button(self, enable: "int"):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
@property
@rpc_call
def sample_name(self):
@@ -139,6 +149,18 @@ class XRayEye(RPCBase):
None
"""
@rpc_call
def switch_tab(self, tab: "str"):
"""
None
"""
@rpc_call
def submit_fit_array(self, fit_array):
"""
None
"""
class XRayEye2DControl(RPCBase):
@rpc_call

View File

@@ -1,140 +0,0 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
return
image.disconnect_monitor("cam_xeye")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['omny_alignment.py']}

View File

@@ -1,125 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -5,6 +5,7 @@ from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
@@ -21,7 +22,10 @@ from qtpy.QtWidgets import (
QToolButton,
QVBoxLayout,
QWidget,
QTextEdit,
QTabWidget
)
import time
logger = bec_logger.logger
CAMERA = ("cam_xeye", "image")
@@ -124,8 +128,8 @@ class XRayEye2DControl(BECWidget, QWidget):
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
USER_ACCESS = ["active_roi", "user_message", "user_message.setter","on_live_view_enabled","on_motors_enable","enable_submit_button",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter","switch_tab","submit_fit_array"]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
@@ -136,21 +140,31 @@ class XRayEye(BECWidget, QWidget):
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.bec_dispatcher.connect_slot(self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh"))
self.bec_dispatcher.connect_slot(self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0]))
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self):
self.core_layout = QHBoxLayout(self)
self.root_layout = QVBoxLayout(self)
self.tab_widget = QTabWidget(parent=self)
self.root_layout.addWidget(self.tab_widget)
self.image = Image(parent=self)
self.alignment_tab = QWidget(parent=self)
self.core_layout = QHBoxLayout(self.alignment_tab)
self.image = Image(parent=self.alignment_tab)
self.image.color_map = "CET-L2"
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
self.image.enable_full_colorbar = True
self.image.invert_y = True # Invert y axis to match image coordinates
# Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self)
self.control_panel = QWidget(parent=self.alignment_tab)
self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10)
@@ -166,16 +180,35 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_label, 0, Qt.AlignmentFlag.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(header_row)
switch_row = QHBoxLayout()
switch_row.setContentsMargins(0, 0, 0, 0)
switch_row.setSpacing(8)
switch_row.addStretch()
self.camera_running_label = QLabel("Camera running", parent=self)
self.camera_running_toggle = ToggleSwitch(parent=self)
# self.camera_running_toggle.checked = False
self.camera_running_toggle.enabled.connect(self.camera_running_enabled)
self.shutter_label = QLabel("Shutter open", parent=self)
self.shutter_toggle = ToggleSwitch(parent=self)
# self.shutter_toggle.checked = False
self.shutter_toggle.enabled.connect(self.opening_shutter)
switch_row.addWidget(self.shutter_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.shutter_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(switch_row)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignCenter)
# separator
self.control_panel_layout.addWidget(self._create_separator())
@@ -190,9 +223,8 @@ class XRayEye(BECWidget, QWidget):
# Submit button
self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
step_size_form.addWidget(QLabel("Step Size", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel
@@ -207,7 +239,8 @@ class XRayEye(BECWidget, QWidget):
self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QLineEdit(parent=self)
self.message_line_edit = QTextEdit(parent=self)
self.message_line_edit.setFixedHeight(60)
self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1)
@@ -217,12 +250,39 @@ class XRayEye(BECWidget, QWidget):
self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
self.control_panel.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding)
# Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel)
self.tab_widget.addTab(self.alignment_tab, "Alignment")
self.fit_tab = QWidget(parent=self)
self.fit_layout = QVBoxLayout(self.fit_tab)
self.waveform_x = Waveform(parent=self.fit_tab)
self.waveform_y = Waveform(parent=self.fit_tab)
self.waveform_x.plot(x=[0],y=[1], label="fit-x",dap="SineModel",dap_parameters={"frequency":{"value":0.0174533,"vary":False,"min":0.01,"max":0.02}},dap_oversample=5)
self.waveform_y.plot(x=[0],y=[2], label="fit-y",dap="SineModel")#,dap_oversample=5)
self.fit_x = self.waveform_x.curves[0]
self.fit_y = self.waveform_y.curves[0]
self.waveform_x.dap_params_update.connect(self.on_dap_params)
self.waveform_y.dap_params_update.connect(self.on_dap_params)
for wave in (self.waveform_x,self.waveform_y):
wave.x_label = "Angle (deg)"
wave.x_grid = True
wave.y_grid = True
wave.enable_toolbar = True
self.fit_layout.addWidget(self.waveform_x)
self.fit_layout.addWidget(self.waveform_y)
self.tab_widget.addTab(self.fit_tab, "Fit")
def _make_connections(self):
# Fetch initial state
self.on_live_view_enabled(True)
@@ -235,13 +295,14 @@ class XRayEye(BECWidget, QWidget):
def _create_separator(self):
sep = QFrame(parent=self)
sep.setFrameShape(QFrame.HLine)
sep.setFrameShadow(QFrame.Sunken)
sep.setFrameShape(QFrame.Shape.HLine)
sep.setFrameShadow(QFrame.Shadow.Sunken)
sep.setLineWidth(1)
return sep
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
self.dev.omnyfsh.read()
################################################################################
# Device Connection logic
@@ -254,7 +315,7 @@ class XRayEye(BECWidget, QWidget):
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
logger.info(f"Succesfully connected to {motor}")
logger.info(f"Successfully connected to {motor}")
################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed
@@ -290,6 +351,14 @@ class XRayEye(BECWidget, QWidget):
################################################################################
# Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeSlot(str)
def switch_tab(self,tab:str):
if tab == "fit":
self.tab_widget.setCurrentIndex(1)
else:
self.tab_widget.setCurrentIndex(0)
@SafeSlot()
def get_roi_coordinates(self) -> dict | None:
@@ -307,14 +376,50 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_toggle.blockSignals(True)
if enabled:
self.live_preview_toggle.checked = enabled
self.image.image(CAMERA)
self.image.image(device_name=CAMERA[0],device_entry=CAMERA[1])
self.live_preview_toggle.blockSignals(False)
return
self.image.disconnect_monitor(CAMERA)
self.image.disconnect_monitor(CAMERA[0],CAMERA[1])
self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool)
def camera_running_enabled(self, enabled: bool):
logger.info(f"Camera running: {enabled}")
self.camera_running_toggle.blockSignals(True)
self.dev.get(CAMERA[0]).live_mode_enabled.put(enabled)
self.camera_running_toggle.checked = enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(dict,dict)
def getting_camera_status(self,data,meta):
print(f"msg:{data}")
live_mode_enabled = data.get("signals").get(f"{CAMERA[0]}_live_mode_enabled").get("value")
self.camera_running_toggle.blockSignals(True)
self.camera_running_toggle.checked = live_mode_enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(bool)
def opening_shutter(self, enabled: bool):
logger.info(f"Shutter changed from GUI to: {enabled}")
self.shutter_toggle.blockSignals(True)
if enabled:
self.dev.omnyfsh.fshopen()
else:
self.dev.omnyfsh.fshclose()
# self.shutter_toggle.checked = enabled
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict,dict)
def getting_shutter_status(self,data,meta):
shutter_open = bool(data.get("signals").get("omnyfsh_shutter").get("value"))
self.shutter_toggle.blockSignals(True)
self.shutter_toggle.checked = shutter_open
self.shutter_toggle.blockSignals(False)
@SafeSlot(bool, bool)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
"""
@@ -327,58 +432,60 @@ class XRayEye(BECWidget, QWidget):
self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(int)
def enable_submit_button(self, enable: int):
@SafeSlot(bool)
def enable_submit_button(self, enable: bool):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
if enable == -1:
self.submit_button.setEnabled(False)
else:
if enable:
self.submit_button.setEnabled(True)
else:
self.submit_button.setEnabled(False)
@SafeSlot(dict,dict)
def on_dap_params(self,data,meta):
print('#######################################')
print('getting dap parameters')
print(f"data: {data}")
print(f"meta: {meta}")
self.waveform_x.auto_range(True)
self.waveform_y.auto_range(True)
# self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
curve_id = meta.get("curve_id")
if curve_id == "fit-x-SineModel":
self.dev.omny_xray_gui.fit_params_x.set(data).wait()
print(f"setting x data to {data}")
else:
self.dev.omny_xray_gui.fit_params_y.set(data).wait()
print(f"setting y data to {data}")
# self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@SafeSlot()
def submit_fit_array(self,fit_array):
self.tab_widget.setCurrentIndex(1)
# self.fix_x.title = " got fit array"
print(f"got fit array {fit_array}")
self.waveform_x.curves[0].set_data(x=fit_array[0],y=fit_array[1])
# self.fit_x.set_data(x=fit_array[0],y=fit_array[1])
# self.fit_y.set_data(x=fit_array[0],y=fit_array[2])
@SafeSlot(dict, dict)
def device_updates(self, data: dict, meta: dict):
"""
Slot to handle device updates from omny_xray_gui device.
Args:
data(dict): data from device
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot()
def submit(self):
"""Execute submit action by submit button."""
print('submit pushed')
self.submit_button.blockSignals(True)
if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI")
return
@@ -400,12 +507,14 @@ class XRayEye(BECWidget, QWidget):
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
xval_x = getattr(self.dev.omny_xray_gui, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
print('submit done')
self.submit_button.blockSignals(False)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
@@ -416,9 +525,12 @@ if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv)
apply_theme("light")
dispatcher = BECDispatcher(gui_id='xray')
win = XRayEye()
win.resize(1000, 800)

View File

@@ -9,17 +9,6 @@ eiger_1_5:
readoutPriority: async
softwareTrigger: False
eiger_9:
description: Eiger 9M detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera

View File

@@ -37,21 +37,6 @@ mcs:
readoutPriority: monitored
softwareTrigger: false
##########################################################################
########################### FAST SHUTTER #################################
##########################################################################
fsh:
description: Fast shutter manual control and readback
deviceClass: csaxs_bec.devices.epics.fast_shutter.cSAXSFastEpicsShutter
deviceConfig:
prefix: 'X12SA-ES1-TTL:'
onFailure: raise
enabled: true
readoutPriority: monitored
##########################################################################
######################## SMARACT STAGES ##################################
##########################################################################
@@ -75,7 +60,6 @@ xbpm3x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -22.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -98,7 +82,6 @@ xbpm3y:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -2
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -121,7 +104,6 @@ sl3trxi:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -5.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -144,7 +126,6 @@ sl3trxo:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 6
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -167,7 +148,6 @@ sl3trxb:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -5.8
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -190,7 +170,6 @@ sl3trxt:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 5.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -213,7 +192,6 @@ fast_shutter_n1_x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -7
in: 0
@@ -237,7 +215,6 @@ fast_shutter_o1_x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -15.8
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -260,7 +237,6 @@ fast_shutter_o2_x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -15.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -283,7 +259,6 @@ filter_array_1_x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 25
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -306,7 +281,6 @@ filter_array_2_x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 25.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -329,7 +303,6 @@ filter_array_3_x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 25.8
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -352,7 +325,6 @@ filter_array_4_x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 25
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -375,7 +347,6 @@ sl4trxi:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -5.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -398,7 +369,6 @@ sl4trxo:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 6
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -421,7 +391,6 @@ sl4trxb:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -5.8
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -444,7 +413,6 @@ sl4trxt:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 5.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -457,7 +425,7 @@ sl5trxi:
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: C
host: x12sa-eb-smaract-mcs-05.psi.ch
host: x12sa-eb-smaract-mcs-02.psi.ch
limits:
- -200
- 200
@@ -469,7 +437,6 @@ sl5trxi:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -6
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -480,7 +447,7 @@ sl5trxo:
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: D
host: x12sa-eb-smaract-mcs-05.psi.ch
host: x12sa-eb-smaract-mcs-02.psi.ch
limits:
- -200
- 200
@@ -492,7 +459,6 @@ sl5trxo:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 5.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -503,7 +469,7 @@ sl5trxb:
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: E
host: x12sa-eb-smaract-mcs-05.psi.ch
host: x12sa-eb-smaract-mcs-02.psi.ch
limits:
- -200
- 200
@@ -515,7 +481,6 @@ sl5trxb:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -5.5
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -526,7 +491,7 @@ sl5trxt:
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: F
host: x12sa-eb-smaract-mcs-05.psi.ch
host: x12sa-eb-smaract-mcs-02.psi.ch
limits:
- -200
- 200
@@ -538,7 +503,6 @@ sl5trxt:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 6
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -549,7 +513,7 @@ xbimtrx:
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: A
host: x12sa-eb-smaract-mcs-05.psi.ch
host: x12sa-eb-smaract-mcs-02.psi.ch
limits:
- -200
- 200
@@ -561,7 +525,6 @@ xbimtrx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: -14.7
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
@@ -572,7 +535,7 @@ xbimtry:
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: B
host: x12sa-eb-smaract-mcs-05.psi.ch
host: x12sa-eb-smaract-mcs-02.psi.ch
limits:
- -200
- 200
@@ -584,7 +547,6 @@ xbimtry:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel

View File

@@ -89,7 +89,6 @@ xbpm2x:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 0
@@ -109,7 +108,6 @@ xbpm2y:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
@@ -129,7 +127,6 @@ cu_foilx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 2
@@ -149,7 +146,6 @@ scinx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 3

View File

@@ -17,7 +17,6 @@ feyex:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -16.267
out: -1
@@ -36,7 +35,6 @@ feyey:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -10.467
fheater:
@@ -54,7 +52,6 @@ fheater:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
foptx:
description: Optics X
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -70,7 +67,6 @@ foptx:
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -13.761
fopty:
@@ -88,7 +84,6 @@ fopty:
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0.552
out: 0.752
@@ -107,7 +102,6 @@ foptz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 23
fsamroy:
@@ -125,7 +119,6 @@ fsamroy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
fsamx:
description: Sample coarse X
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -141,7 +134,6 @@ fsamx:
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -1.1
fsamy:
@@ -159,7 +151,6 @@ fsamy:
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 2.75
ftracky:
@@ -177,7 +168,6 @@ ftracky:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftrackz:
description: Laser Tracker coarse Z
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -193,7 +183,6 @@ ftrackz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftransx:
description: Sample transer X
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -209,7 +198,6 @@ ftransx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftransy:
description: Sample transer Y
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -225,7 +213,6 @@ ftransy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
sensor_voltage: -2.4
ftransz:
@@ -243,7 +230,6 @@ ftransz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftray:
description: Sample transfer tray
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -259,7 +245,6 @@ ftray:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
############################################################
@@ -294,7 +279,6 @@ fosax:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 9.124
out: 5.3
@@ -313,7 +297,6 @@ fosay:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0.367
fosaz:
@@ -331,7 +314,6 @@ fosaz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 8.5
out: 6
@@ -352,7 +334,6 @@ rtx:
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
low_signal: 10000
min_signal: 9000
@@ -369,7 +350,6 @@ rty:
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
tomo_additional_offsety: 0
rtz:
@@ -384,7 +364,6 @@ rtz:
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
############################################################
####################### Cameras ############################
@@ -435,9 +414,9 @@ cam_xeye:
# deviceConfig:
# camera_id: 203
# bits_per_pixel: 24
# num_rotation_90: 3
# num_rotation_90: 2
# transpose: false
# force_monochrome: true
# force_monochrome: false
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
@@ -460,8 +439,8 @@ flomni_temphum:
# ########## OMNY / flOMNI / LamNI fast shutter ##############
# ############################################################
omnyfsh:
description: omnyfsh connects to fast shutter at X12 if device fsh exists
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastShutter
description: omnyfsh connects to read fast shutter at X12 if in that network
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastEpicsShutter
deviceConfig: {}
enabled: true
onFailure: buffer
@@ -471,19 +450,10 @@ omnyfsh:
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui Epics signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayAlignGUI
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
calculated_signal:
description: Calculated signal from alignment for fit
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline
readoutPriority: on_request

View File

@@ -19,7 +19,6 @@ leyex:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 14.117
leyey:
@@ -39,7 +38,6 @@ leyey:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 48.069
out: 0.5
@@ -60,7 +58,6 @@ loptx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -0.244
out: -0.699
@@ -81,7 +78,6 @@ lopty:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 3.724
out: 3.53
@@ -102,7 +98,6 @@ loptz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
lsamrot:
description: Sample rotation
deviceClass: csaxs_bec.devices.omny.galil.lgalil_ophyd.LamniGalilMotor
@@ -120,7 +115,6 @@ lsamrot:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
lsamx:
description: Sample coarse X
deviceClass: csaxs_bec.devices.omny.galil.lgalil_ophyd.LamniGalilMotor
@@ -138,7 +132,6 @@ lsamx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
center: 8.768
lsamy:
@@ -158,7 +151,6 @@ lsamy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
center: 10.041
@@ -184,7 +176,6 @@ losax:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -1.442
losay:
@@ -204,7 +195,6 @@ losay:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -0.171
out: 3.8
@@ -225,7 +215,6 @@ losaz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -1
out: -3
@@ -249,7 +238,6 @@ rtx:
deviceTags:
- lamni
readoutPriority: baseline
connectionTimeout: 20
enabled: true
readOnly: False
rty:
@@ -267,7 +255,6 @@ rty:
deviceTags:
- lamni
readoutPriority: baseline
connectionTimeout: 20
enabled: true
readOnly: False

View File

@@ -69,7 +69,6 @@ rtx:
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
low_signal: 8500
min_signal: 8000
@@ -85,7 +84,6 @@ rty:
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
tomo_additional_offsety: 0
rtz:
@@ -100,7 +98,6 @@ rtz:
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
# ############################################################
# ##################### OMNY samples #########################
@@ -168,7 +165,6 @@ ofzpx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -0.4317
ofzpy:
@@ -188,7 +184,6 @@ ofzpy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0.7944
out: 0.6377
@@ -209,7 +204,6 @@ ofzpz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
otransx:
@@ -229,7 +223,6 @@ otransx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
otransy:
@@ -249,7 +242,6 @@ otransy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
up_position: -1.2
gripper_sensorvoltagetarget: -2.30
@@ -270,7 +262,6 @@ otransz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
osamx:
@@ -290,7 +281,6 @@ osamx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -0.1
osamz:
@@ -310,7 +300,6 @@ osamz:
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
oosay:
@@ -330,7 +319,6 @@ oosay:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
near_field_in: 0.531
far_field_in: 0.4122
@@ -351,7 +339,6 @@ oosax:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
near_field_in: 3.2044
far_field_in: 3.022
@@ -372,7 +359,6 @@ oosaz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
near_field_in: -0.4452
far_field_in: 6.5
@@ -393,7 +379,6 @@ oparkz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
oshuttleopen:
@@ -413,7 +398,6 @@ oshuttleopen:
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
oshuttlealign:
@@ -433,7 +417,6 @@ oshuttlealign:
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
osamy:
@@ -453,7 +436,6 @@ osamy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
otracky:
@@ -473,7 +455,6 @@ otracky:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
start_pos: -4.3431
osamroy:
@@ -493,7 +474,6 @@ osamroy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0
otrackz:
@@ -513,7 +493,6 @@ otrackz:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
start_pos: -0.6948
oeyex:
@@ -533,7 +512,6 @@ oeyex:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
xray_in: -45.7394
oeyez:
@@ -553,7 +531,6 @@ oeyez:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
xray_in: -2
oeyey:
@@ -573,7 +550,6 @@ oeyey:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
xray_in: 0.0229
@@ -596,7 +572,6 @@ ocsx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
nothing: 0
ocsy:
@@ -614,7 +589,6 @@ ocsy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
nothing: 0
oshield:
@@ -632,6 +606,5 @@ oshield:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
nothing: 0

View File

@@ -1,15 +1 @@
############################################################
############################################################
##################### EPS ##################################
############################################################
x12saEPS:
description: X12SA EPS info and control
deviceClass: csaxs_bec.devices.epics.eps.EPS
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline

View File

@@ -64,7 +64,6 @@ npx:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
deviceTags:
- npoint
npy:
@@ -82,6 +81,5 @@ npy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
deviceTags:
- npoint

View File

@@ -0,0 +1,149 @@
"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK."""
import threading
import traceback
from enum import IntEnum
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt, Kind, Signal
from ophyd.areadetector import ADComponent as ADCpt
from ophyd.areadetector import DetectorBase
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
logger = bec_logger.logger
class ACQUIRE_MODES(IntEnum):
"""Acquiring enums for Allied Vision Camera"""
ACQUIRING = 1
DONE = 0
class AlliedVisionCamera(PSIDeviceBase, DetectorBase):
"""
Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK.
The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'.
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
cam = ADCpt(VimbaDetectorCam, "cam1:")
image = ADCpt(ImagePlugin, "image1:")
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0,
doc="Preview signal of the AlliedVision camera.",
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(
self,
*,
name: str,
prefix: str,
poll_rate: int = 5,
scan_info=None,
device_manager=None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_start_event = threading.Event()
if poll_rate > 10:
logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.")
poll_rate = 10
self._poll_rate = poll_rate
self._unique_array_id = 0
self._pv_timeout = 2.0
self.image: ImagePlugin
self._live_mode_lock = threading.RLock()
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
"""Start live mode."""
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
"""Stop live mode."""
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if not self._poll_start_event.is_set():
self._poll_start_event.set()
self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition
else:
logger.info(f"Live mode already started for {self.name}.")
return
if self._poll_start_event.is_set():
self._poll_start_event.clear()
self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition
else:
logger.info(f"Live mode already stopped for {self.name}.")
def on_connected(self):
"""Reset the unique array ID on connection."""
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
self._poll_thread.start()
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
while self._poll_start_event.wait():
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
array_size = self.image.array_size.get()
if array_size[0] == 0: # 2D image, not color image
array_size = array_size[1:]
# Geometry correction for the image
data = np.reshape(value, array_size)
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
def on_destroy(self):
"""Stop the polling thread on destruction."""
self._poll_thread_kill_event.set()
self._poll_start_event.set()
if self._poll_thread.is_alive():
self._poll_thread.join(timeout=2)

View File

@@ -37,7 +37,7 @@ to interrupt and ongoing sequence if needed.
- a = t0 + 2ms (2ms delay to allow the shutter to open)
- b = a + 1us (short pulse)
- c = t0
- d = a + exp_time * burst_count
- d = a + exp_time * burst_count + 1ms (to allow the shutter to close)
- e = d
- f = e + 1us (short pulse to OR gate for MCS triggering)

View File

@@ -24,7 +24,7 @@ DELAY CHANNELS:
- a = t0 + 2ms (2ms delay to allow the shutter to open)
- b = a + 1us (short pulse)
- c = t0
- d = a + exp_time * burst_count
- d = a + exp_time * burst_count + 1ms (to allow the shutter to close)
- e = d
- f = e + 1us (short pulse to OR gate for MCS triggering)
"""
@@ -37,9 +37,7 @@ import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import CompareStatus, DeviceStatus, StatusBase, TransitionStatus
from ophyd_devices import CompareStatus, DeviceStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
@@ -71,7 +69,7 @@ logger = bec_logger.logger
# This can be adapted as needed, or fine-tuned per channel. On every reload of the
# device configuration in BEC, these values will be set into the DDG1 device.
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
"amplitude": 4.5,
"amplitude": 5.0,
"offset": 0.0,
"polarity": OUTPUTPOLARITY.POSITIVE,
"mode": "ttl",
@@ -133,27 +131,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
device_manager (DeviceManagerBase | None, optional): Device manager. Defaults to None.
"""
USER_ACCESS = ["keep_shutter_open_during_scan", "set_trigger"]
# TODO Consider using the 'fsh' device instead.
fast_shutter_readback = Cpt(
EpicsSignalRO,
read_pv="X12SA-ES1-TTL:INP_01",
add_prefix=("",), # Add this to prevent the prefix to be added to the signal
kind=Kind.omitted,
auto_monitor=True,
)
# The shutter control PV can indicate if the shutter is requested to be kept open. If that
# is the case, we can not use the signal shutter_readback signal to check if the delay cycle
# finishes but have to use the polling of the event status register to check if the burst finished.
fast_shutter_control = Cpt(
EpicsSignalRO,
read_pv="X12SA-ES1-TTL:OUT_01",
add_prefix=("",), # Add this to prevent the prefix to be added to the signal
kind=Kind.omitted,
auto_monitor=True,
)
def __init__(
self,
name: str,
@@ -165,7 +142,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._shutter_to_open_delay = 2e-3
self.device_manager = device_manager
self._poll_thread = threading.Thread(target=self._poll_event_status, daemon=True)
self._poll_thread_run_event = threading.Event()
@@ -216,21 +192,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self.burst_delay.put(0)
self.burst_count.put(1)
def keep_shutter_open_during_scan(self, open: True) -> None:
"""
Method to configure the delay generator for keeping the shutter open during a scans.
This means that the additional delay to open the shutter needs to be removed (2e-3)
from the timing of the signals.
Args:
open (bool): If True, the shutter will be kept open during the scan.
If False, the shutter will be opened and closed for each trigger cycle.
"""
if open is True:
self._shutter_to_open_delay = 0
else:
self._shutter_to_open_delay = 2e-3
def on_stage(self) -> None:
"""
@@ -269,18 +230,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
if self.burst_count.get() != 1:
self.burst_count.put(1)
#####################################
## Setup trigger source if needed ###
#####################################
# NOTE Some scans may change the trigger source to an external trigger,
# so we will make sure that the default trigger source is set for the DDG1
# before each scan. If a scan requires a different trigger source, i.e.
# external triggers then the scan should implement this change after the
# on_stage method was called.
if self.trigger_source.get() != DEFAULT_TRIGGER_SOURCE:
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
#########################################
### Setup timing for burst and delays ###
#########################################
@@ -290,34 +239,27 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Burst Period DDG1
# Set burst_period to shutter width
# c/t0 + self._shutter_to_open_delay + exp_time * burst_count
shutter_width = (
self._shutter_to_open_delay + exp_time * frames_per_trigger
) # Shutter starts closing at end of exposure
# c/t0 + 2ms + exp_time * burst_count + 1ms
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
if self.burst_period.get() != shutter_width:
self.burst_period.put(shutter_width)
# Trigger DDG2
# a = t0 + 2ms, b = a + 1us
# a has reference to t0, b has reference to a
# Add delay of self._shutter_to_open_delay to allow shutter to open
self.set_delay_pairs(channel="ab", delay=self._shutter_to_open_delay, width=1e-6)
# Add delay of 2ms to allow shutter to open
self.set_delay_pairs(channel="ab", delay=2e-3, width=1e-6)
# Trigger shutter
# d = c/t0 + self._shutter_to_open_delay + exp_time * burst_count + 1ms
# d = c/t0 + 2ms + exp_time * burst_count + 1ms
# c has reference to t0, d has reference to c
# Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay)
# Shutter opens without delay at t0, closes after exp_time * burst_count + 3ms (2ms open, 1ms close)
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
self.set_delay_pairs(channel="gh", delay=self._shutter_to_open_delay, width=(shutter_width-self._shutter_to_open_delay))
# Trigger extra pulse for MCS OR gate
# f = e + 1us
# e has refernce to d, f has reference to e
if self.scan_info.msg.scan_type == "fly":
self.set_delay_pairs(channel="ef", delay=0, width=0)
else:
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
# NOTE Add additional sleep to make sure that the IOC and DDG HW process the values properly
# This value has been choosen empirically after testing with the HW. It's
@@ -489,19 +431,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
def on_trigger(self) -> DeviceStatus:
"""
This method is called from BEC as a software trigger. Here the logic is as follows:
We first check if the trigger_source is set to SINGLE_SHOT. Only then will we received,
otherwise we return a status object directly as the DDG is triggered by an external
source which will have to implement its own logic to wait for trigger signals to
be received.
I SINGLE_SHOT, the implementation here will send a software trigger. Now there are
two options to wait for the trigger (burst) cycle to be done. One is to rely on the
signal of the "mcs" card if it is present. However, this is only possible if the
scan_type is not "fly" as in fly scans the ef channel is not triggered to send the last
pulse to the card (but the card is finishing its acquisition in complete itself). Then
we rely on the polling of the event status register to check if the burst cycle is done.
This method is called from BEC as a software trigger.
It follows a specific procedure to ensure that the DDG1 and MCS card are properly handled
on a trigger event. The established logic is as follows:
@@ -520,46 +450,33 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
- Return the status object to BEC which will automatically resolve once the status register has
the END_OF_BURST bit set. The callback of the status object will also stop the polling loop.
"""
overall_start = time.time()
self._stop_polling()
# NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source
# thus we can not expect that trigger signals are meant to be awaited for. In this case,
# we can directly return.
if self.trigger_source.get() != TRIGGERSOURCE.SINGLE_SHOT.value:
status = StatusBase(obj=self)
status.set_finished()
return status
self._poll_thread_poll_loop_done.wait(timeout=1)
# NOTE: This sleep is important to ensure that the HW is ready to process new commands.
# It has been empirically determined after long testing that this improves stability.
time.sleep(0.02)
# NOTE If the MCS card is present in the current session of BEC,
# we prepare the card for the next trigger. The procedure is implemented
# in the '_prepare_mcs_on_trigger' method. We will also use the mcs card
# to indicate when the burst cycle is done. If no mcs card is available
# the fallback is to use the polling of the DDG
# in the '_prepare_mcs_on_trigger' method.
# Prepare the MCS card for the next software trigger
mcs = self.device_manager.devices.get("mcs", None)
if mcs is None or mcs.enabled is False or self.scan_info.msg.scan_type == "fly":
self._poll_thread_poll_loop_done.wait(timeout=1)
logger.warning("Did not find mcs card with name 'mcs' in current session")
time.sleep(0.02)
# Shutter is kept open, we can only rely on the event status register
status = self._prepare_trigger_status_event()
# Start polling thread again to monitor event status
self._start_polling()
if mcs is None or mcs.enabled is False:
logger.info("Did not find mcs card with name 'mcs' in current session")
else:
start_time = time.time()
logger.debug(f"Preparing mcs card ")
status_mcs = self._prepare_mcs_on_trigger(mcs)
# NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash
# an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should
# be investigated why the EPICS interface is slow to respond.
status_mcs.wait(timeout=3)
status = TransitionStatus(mcs.acquiring, [ACQUIRING.ACQUIRING, ACQUIRING.DONE])
logger.debug(f"Finished preparing mcs card {time.time()-start_time}")
# Send trigger
# Prepare StatusBitsCompareStatus to resolve once the END_OF_BURST bit was set.
status = self._prepare_trigger_status_event()
# Start polling thread again to monitor event status
self._start_polling()
# Trigger the DDG1
self.trigger_shot.put(1, use_complete=True)
self.cancel_on_stop(status)
logger.info(f"Configured ddg in {time.time()-overall_start}")
return status
def on_stop(self) -> None:

View File

@@ -37,7 +37,6 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
ChannelConfig,
DelayGeneratorCSAXS,
LiteralChannels,
BURSTCONFIG,
)
logger = bec_logger.logger
@@ -48,7 +47,7 @@ logger = bec_logger.logger
# NOTE Default channel configuration for the DDG2 delay generator channels
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
"amplitude": 4.5,
"amplitude": 5.0,
"offset": 0.0,
"polarity": OUTPUTPOLARITY.POSITIVE,
"mode": "ttl",
@@ -135,9 +134,6 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(DEFAULT_REFERENCES)
# Set burst config
self.burst_config.put(BURSTCONFIG.FIRST_CYCLE.value)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""

View File

@@ -488,7 +488,6 @@ class DelayGeneratorCSAXS(Device):
name="trigger_source",
kind=Kind.omitted,
doc="Trigger Source for the DDG, options in TRIGGERSOURCE",
auto_monitor=True,
)
trigger_level = Cpt(
EpicsSignal,

View File

@@ -1,435 +0,0 @@
"""EPS module for cSAXS beamline: defines the EPS device with its components and methods."""
# fmt: off
# Disable Black formatting for this file to preserve an easier readable structure for the component definitions.
# pylint: disable=line-too-long
from __future__ import annotations
import time
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
# ---------------------------
# Registry: sections/channels
# ---------------------------
class EPSSubDevices(Device):
"""Base class for EPS sub-device components (e.g. alarms, valves, shutters). with common methods if needed."""
def describe(self) -> dict:
desc = super().describe()
for walk in self.walk_signals():
if walk.item.attr_name not in desc:
desc[walk.item.attr_name] = walk.item.describe()
return desc
class EPSAlarms(EPSSubDevices):
"""EPS alarms at the cSAXS beamline."""
eps_alarm_cnt = Cpt(EpicsSignalRO, read_pv="X12SA-EPS-PLC:AlarmCnt_EPS", add_prefix=("",), name="eps_alarm_cnt", kind=Kind.omitted, doc="X12SA EPS Alarm count", auto_monitor=True, labels={"alarm"})
mis_alarm_cnt = Cpt(EpicsSignalRO, read_pv="ARS00-MIS-PLC-01:AlarmCnt_Frontends", add_prefix=("",), name="mis_alarm_cnt", kind=Kind.omitted, doc="FrontEnd MIS Alarm count", auto_monitor=True, labels={"alarm"})
class ValvesFrontend(EPSSubDevices):
"""Valves frontend at the cSAXS beamline."""
fe_vvpg_0000 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-0000:PLC_OPEN", add_prefix=("",), name="fevvpg0000", kind=Kind.omitted, doc="FE-VVPG-0000", auto_monitor=True, labels={"valve"})
fe_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-1010:PLC_OPEN", add_prefix=("",), name="fevvpg1010", kind=Kind.omitted, doc="FE-VVPG-1010", auto_monitor=True, labels={"valve"})
fe_vvfv_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVFV-2010:PLC_OPEN", add_prefix=("",), name="fevvfv2010", kind=Kind.omitted, doc="FE-VVFV-2010", auto_monitor=True, labels={"valve"})
fe_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-2010:PLC_OPEN", add_prefix=("",), name="fevvpg2010", kind=Kind.omitted, doc="FE-VVPG-2010", auto_monitor=True, labels={"valve"})
class ValvesOptics(EPSSubDevices):
"""Valves at the optics hutch."""
op_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-1010:PLC_OPEN", add_prefix=("",), name="opvvpg1010", kind=Kind.omitted, doc="OP-VVPG-1010", auto_monitor=True, labels={"valve"})
op_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-2010:PLC_OPEN", add_prefix=("",), name="opvvpg2010", kind=Kind.omitted, doc="OP-VVPG-2010", auto_monitor=True, labels={"valve"})
op_vvpg_3010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3010:PLC_OPEN", add_prefix=("",), name="opvvpg3010", kind=Kind.omitted, doc="OP-VVPG-3010", auto_monitor=True, labels={"valve"})
op_vvpg_3020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3020:PLC_OPEN", add_prefix=("",), name="opvvpg3020", kind=Kind.omitted, doc="OP-VVPG-3020", auto_monitor=True, labels={"valve"})
op_vvpg_4010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-4010:PLC_OPEN", add_prefix=("",), name="opvvpg4010", kind=Kind.omitted, doc="OP-VVPG-4010", auto_monitor=True, labels={"valve"})
op_vvpg_5010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-5010:PLC_OPEN", add_prefix=("",), name="opvvpg5010", kind=Kind.omitted, doc="OP-VVPG-5010", auto_monitor=True, labels={"valve"})
op_vvpg_6010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-6010:PLC_OPEN", add_prefix=("",), name="opvvpg6010", kind=Kind.omitted, doc="OP-VVPG-6010", auto_monitor=True, labels={"valve"})
op_vvpg_7010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-7010:PLC_OPEN", add_prefix=("",), name="opvvpg7010", kind=Kind.omitted, doc="OP-VVPG-7010", auto_monitor=True, labels={"valve"})
class ValvesEndstation(EPSSubDevices):
"""Endstation valves at the cSAXS beamline."""
es_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-ES-VVPG-1010:PLC_OPEN", add_prefix=("",), name="esvvpg1010", kind=Kind.omitted, doc="ES-VVPG-1010", auto_monitor=True, labels={"valve"})
class ShuttersFrontend(EPSSubDevices):
"""Shutters frontend."""
fe_psh1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN", add_prefix=("",), name="fepsh1", kind=Kind.omitted, doc="FE-PSH1-EMLS-0010", auto_monitor=True, labels={"shutter"})
fe_sto1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-STO1-EMLS-0010:OPEN", add_prefix=("",), name="festo1", kind=Kind.omitted, doc="FE-STO1-EMLS-0010", auto_monitor=True, labels={"shutter"})
class ShuttersEndstation(EPSSubDevices):
"""Shutters at the endstation."""
es_psh17010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN", add_prefix=("",), name="espsh17010", kind=Kind.omitted, doc="OP-PSH1-EMLS-7010", auto_monitor=True, labels={"shutter"})
class DMMMonochromator(EPSSubDevices):
"""DMM monochromator signals at the cSAXS beamline."""
dmm_temp_surface_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3010:TEMP", add_prefix=("",), name="dmm_temp_surface_1", kind=Kind.omitted, doc="DMM Temp Surface 1", auto_monitor=True, labels={"temp"})
dmm_temp_surface_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3020:TEMP", add_prefix=("",), name="dmm_temp_surface_2", kind=Kind.omitted, doc="DMM Temp Surface 2", auto_monitor=True, labels={"temp"})
dmm_temp_shield_1_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3030:TEMP", add_prefix=("",), name="dmm_temp_shield_1_disaster", kind=Kind.omitted, doc="DMM Temp Shield 1 (disaster)", auto_monitor=True, labels={"temp"})
dmm_temp_shield_2_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3040:TEMP", add_prefix=("",), name="dmm_temp_shield_2_disaster", kind=Kind.omitted, doc="DMM Temp Shield 2 (disaster)", auto_monitor=True, labels={"temp"})
dmm_translation_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3010:THRU", add_prefix=("",), name="dmm_translation_thru", kind=Kind.omitted, doc="DMM Translation ThruPos", auto_monitor=True, labels={"switch"})
dmm_translation_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3020:IN", add_prefix=("",), name="dmm_translation_in", kind=Kind.omitted, doc="DMM Translation InPos", auto_monitor=True, labels={"switch"})
dmm_bragg_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3030:THRU", add_prefix=("",), name="dmm_bragg_thru", kind=Kind.omitted, doc="DMM Bragg ThruPos", auto_monitor=True, labels={"switch"})
dmm_bragg_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3040:IN", add_prefix=("",), name="dmm_bragg_in", kind=Kind.omitted, doc="DMM Bragg InPos", auto_monitor=True, labels={"switch"})
dmm_heater_fault_xtal_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3050:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_1", kind=Kind.omitted, doc="DMM Heater Fault XTAL 1", auto_monitor=True, labels={"fault"})
dmm_heater_fault_xtal_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3060:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_2", kind=Kind.omitted, doc="DMM Heater Fault XTAL 2", auto_monitor=True, labels={"fault"})
dmm_heater_fault_support_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3070:SWITCH", add_prefix=("",), name="dmm_heater_fault_support_1", kind=Kind.omitted, doc="DMM Heater Fault Support 1", auto_monitor=True, labels={"fault"})
dmm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:ENERGY-GET", add_prefix=("",), name="dmm_energy", kind=Kind.omitted, doc="DMM Energy", auto_monitor=True, labels={"energy"})
dmm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:POSITION", add_prefix=("",), name="dmm_position", kind=Kind.omitted, doc="DMM Position", auto_monitor=True, labels={"string"})
dmm_stripe = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:STRIPE", add_prefix=("",), name="dmm_stripe", kind=Kind.omitted, doc="DMM Stripe", auto_monitor=True, labels={"string"})
class CCMMonochromator(EPSSubDevices):
"""CCM monochromator signals at the cSAXS beamline."""
ccm_temp_crystal = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4010:TEMP", add_prefix=("",), name="ccm_temp_crystal", kind=Kind.omitted, doc="CCM Temp Crystal", auto_monitor=True, labels={"temp"})
ccm_temp_shield_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4020:TEMP", add_prefix=("",), name="ccm_temp_shield_disaster", kind=Kind.omitted, doc="CCM Temp Shield (disaster)", auto_monitor=True, labels={"temp"})
ccm_heater_fault_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4010:SWITCH", add_prefix=("",), name="ccm_heater_fault_1", kind=Kind.omitted, doc="CCM Heater Fault 1", auto_monitor=True, labels={"fault"})
ccm_heater_fault_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4020:SWITCH", add_prefix=("",), name="ccm_heater_fault_2", kind=Kind.omitted, doc="CCM Heater Fault 2", auto_monitor=True, labels={"fault"})
ccm_heater_fault_3 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4030:SWITCH", add_prefix=("",), name="ccm_heater_fault_3", kind=Kind.omitted, doc="CCM Heater Fault 3", auto_monitor=True, labels={"fault"})
ccm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:ENERGY-GET", add_prefix=("",), name="ccm_energy", kind=Kind.omitted, doc="CCM Energy", auto_monitor=True, labels={"energy"})
ccm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:POSITION", add_prefix=("",), name="ccm_position", kind=Kind.omitted, doc="CCM Position", auto_monitor=True, labels={"string"})
class CoolingWater(EPSSubDevices):
"""Cooling water signals at the cSAXS beamline."""
op_sl1_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL1-EFSW-2010:FLOW", add_prefix=("",), name="op_sl1_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL1-EFSW-2010", auto_monitor=True, labels={"flow"})
op_sl2_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL2-EFSW-2010:FLOW", add_prefix=("",), name="op_sl2_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL2-EFSW-2010", auto_monitor=True, labels={"flow"})
op_eb1_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5010:FLOW", add_prefix=("",), name="op_eb1_efsw_5010_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5010", auto_monitor=True, labels={"flow"})
op_eb1_efsw_5020_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5020:FLOW", add_prefix=("",), name="op_eb1_efsw_5020_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5020", auto_monitor=True, labels={"flow"})
op_sl3_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL3-EFSW-5010:FLOW", add_prefix=("",), name="op_sl3_efsw_5010_flow", kind=Kind.omitted, doc="OP-SL3-EFSW-5010", auto_monitor=True, labels={"flow"})
op_kb_efsw_6010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-KB-EFSW-6010:FLOW", add_prefix=("",), name="op_kb_efsw_6010_flow", kind=Kind.omitted, doc="OP-KB-EFSW-6010", auto_monitor=True, labels={"flow"})
op_psh1_efsw_7010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EFSW-7010:FLOW", add_prefix=("",), name="op_psh1_efsw_7010_flow", kind=Kind.omitted, doc="OP-PSH1-EFSW-7010", auto_monitor=True, labels={"flow"})
es_eb2_efsw_1010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-ES-EB2-EFSW-1010:FLOW", add_prefix=("",), name="es_eb2_efsw_1010_flow", kind=Kind.omitted, doc="ES-EB2-EFSW-1010", auto_monitor=True, labels={"flow"})
op_cs_ecvw_0010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0010:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0010", kind=Kind.omitted, doc="OP-CS-ECVW-0010", auto_monitor=True, labels={"valve"})
op_cs_ecvw_0020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0020:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0020", kind=Kind.omitted, doc="OP-CS-ECVW-0020", auto_monitor=True, labels={"valve"})
class EPS(PSIDeviceBase):
"""EPS device for the cSAXS beamline."""
USER_ACCESS = [
"show_all",
"water_cooling_op",
]
alarms = Cpt(EPSAlarms, name="alarms", doc="EPS Alarms")
valves_frontend = Cpt(ValvesFrontend, name="valves_frontend", doc="Valves Frontend")
valves_optics = Cpt(ValvesOptics, name="valves_optics", doc="Valves Optics Hutch")
valves_es = Cpt(ValvesEndstation, name="valves_es", doc="Valves ES Hutch")
shutters_frontend = Cpt(ShuttersFrontend, name="shutters_frontend", doc="Shutters Frontend")
shutters_es = Cpt(ShuttersEndstation, name="shutters_es", doc="Shutters Endstation")
dmm_monochromator = Cpt(DMMMonochromator, name="dmm_monochromator", doc="DMM Monochromator")
ccm_monochromator = Cpt(CCMMonochromator, name="ccm_monochromator", doc="CCM Monochromator")
cooling_water = Cpt(CoolingWater, name="cooling_water", doc="Cooling Water")
# Acknowledgment signals for PLC communication (if needed for future use)
ackerr = Cpt(EpicsSignal, read_pv="X12SA-EPS-PLC:ACKERR-REQUEST", add_prefix=("",), name="ackerr", kind=Kind.omitted, doc="ACKERR request - OP-CS-ECVW-0020", auto_monitor=True, labels={"request"})
request = Cpt(EpicsSignal, read_pv="X12SA-OP-CS-ECVW:PLC_REQUEST", add_prefix=("",), name="op_cs_ecvw_request", kind=Kind.omitted, doc="PLC request - OP-CS-ECVW-PLC_REQUEST", auto_monitor=True, labels={"request"})
def _notify(self, msg: str, show_as_client_msg: bool = True):
"""Utility method to print a message, and optionally send it to the client UI if it should be shown also as a client message."""
try:
if show_as_client_msg:
self.device_manager.connector.send_client_info(msg, scope="", show_asap=True)
else:
print(msg)
except Exception:
logger.error(f"Failed to send client message, falling back to print: {msg}")
print(str(msg))
# ----------------------------------------------------------
# Water cooling operation
# ----------------------------------------------------------
def safe_get(self, sig, default=None):
"""Helper method to safely get a signal value, returning a default if there's an error."""
try:
return sig.get()
except Exception as ex:
logger.warning(f"Failed to get signal {sig.pvname}: {ex}")
return default
def water_cooling_op(self):
"""
Open ECVW valves, reset EPS alarms, monitor for 20s,
then ensure stability (valves remain open) for 10s.
All messages sent to client.
"""
POLL_PERIOD = 2
TIMEOUT = 20
STABILITY = 15
self._notify("=== Water Cooling Operation ===")
# --- Signals ---
eps_alarm_sig = self.alarms.eps_alarm_cnt
ackerr = self.ackerr
request = self.request
valves = [self.cooling_water.op_cs_ecvw_0010, self.cooling_water.op_cs_ecvw_0020]
# Flow channels list extracted from CHANNELS
flow_items = [walk.item for walk in self.cooling_water.walk_signals() if "flow" in walk.item._ophyd_labels_]
# --- Step 1: EPS alarm reset ---
alarm_value = self.safe_get(eps_alarm_sig, 0)
if alarm_value and alarm_value > 0:
self._notify(f"[WaterCooling] EPS alarms present ({alarm_value}) → resetting…")
try:
ackerr.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] WARNING: ACKERR write failed: {ex}")
time.sleep(0.3)
else:
self._notify("[WaterCooling] No EPS alarms detected.")
# --- Step 2: Issue open request ---
self._notify("[WaterCooling] Sending cooling-valve OPEN request…")
try:
request.put(1)
except Exception as ex:
self._notify(f"[WaterCooling] ERROR: Failed to send OPEN request: {ex}")
return False
# --- Step 3: Monitoring loop (clean client table output) ---
start = time.time()
end = start + TIMEOUT
stable_until = None
# Print (server-side) header once
print("Monitoring valves and flow sensors...")
print(f" Valves: {valves[0].attr_name[-4:]}, {valves[1].attr_name[-4:]}")
print(f" Note: stability requires valves to remain OPEN for {STABILITY} seconds.")
# One table header to the client (via device manager)
# Fixed-width columns for alignment in monospaced UI
table_header = f"{'Time':>6} | {'Valves':<21} | {'Flows (OK/FAIL/N/A)':<20}"
self._notify(table_header)
def snapshot():
# Valve snapshot
v_states = [self.safe_get(v, None) for v in valves]
v1 = f"{valves[0].attr_name[-4:]}=" + ("OPEN " if v_states[0] is True or v_states[0] == 1 else "CLOSED" if v_states[0] is False or v_states[0] == 0 else "N/A ")
v2 = f"{valves[1].attr_name[-4:]}=" + ("OPEN " if v_states[1] is True or v_states[1] == 1 else "CLOSED" if v_states[1] is False or v_states[1] == 0 else "N/A ")
# 2 valves with a single space between => width ~ 21
valve_str = f"{v1} {v2}"
# Flow summary: OK/FAIL/N/A counts (compact)
flow_states = []
for fsig in flow_items:
fval = self.safe_get(fsig, None)
flow_states.append(True if fval in (1, True) else False if fval in (0, False) else None)
ok = sum(1 for f in flow_states if f is True)
fail = sum(1 for f in flow_states if f is False)
na = sum(1 for f in flow_states if f is None)
flow_summary = f"{ok:>2} / {fail:>2} / {na:>2}"
return v_states, valve_str, flow_summary
while True:
# TODO Consider adding a timeout to avoid infinite loop.
now = time.time()
elapsed = int(now - start)
if now > end:
# One last line to client
v_states, valves_s, flows_s = snapshot()
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
print("→ TIMEOUT: Cooling valves failed to remain OPEN.")
return False
# Live snapshot
v_states, valves_s, flows_s = snapshot()
# Exactly one concise line to client per cycle
self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}")
both_open = all(s is not None and bool(s) for s in v_states)
if both_open:
if stable_until is None:
stable_until = now + STABILITY
print(f"[WaterCooling] Both valves OPEN → starting {STABILITY}s stability window…")
else:
if now >= stable_until:
print("→ SUCCESS: Valves remained OPEN during stability window.")
return True
else:
if stable_until is not None:
print("[WaterCooling] Valve closed again → restarting stability window.")
stable_until = None
time.sleep(POLL_PERIOD)
def show_all(self):
red = "\x1b[91m"
green = "\x1b[92m"
white = "\x1b[0m"
bold = "\x1b[1m"
cyan = "\x1b[96m"
# ---- New: enum maps for numeric -> string rendering ----
POSITION_ENUM = {0: "out of beam", 1: "in beam"}
STRIPE_ENUM = {0: "Stripe 1 W/B4C", 1: "Stripe 2 NiV/B4C"}
POSITION_ATTRS = {self.dmm_monochromator.dmm_position.attr_name, self.ccm_monochromator.ccm_position.attr_name}
STRIPE_ATTRS = {self.dmm_monochromator.dmm_stripe.attr_name}
def is_bool_like(v):
return isinstance(v, (bool, int)) and v in (0, 1, True, False)
# ---- Changed: accept attr in formatter so we can apply enum mapping ----
def fmt_value(value: any, signal: EpicsSignalRO):
if value is None:
return f"{red}MISSING{white}"
attr = signal.attr_name
# ---------- Explicit enum mappings by attribute ----------
if attr in POSITION_ATTRS:
# Position comes as numeric 0/1
try:
iv = int(value)
return POSITION_ENUM.get(iv, f"{iv}")
except Exception:
# Fallback if its already a string or unexpected
return f"{value}"
if attr in STRIPE_ATTRS:
# Stripe comes as numeric 0/1
try:
iv = int(value)
return STRIPE_ENUM.get(iv, f"{iv}")
except Exception:
return f"{value}"
# ------------------- TEMPERATURE -------------------
if "temp" in signal._ophyd_labels_ and isinstance(value, (int, float)):
return f"{value:.1f}"
# ------------------- ENERGY ------------------------
if "energy" in signal._ophyd_labels_ and isinstance(value, (int, float)):
return f"{value:.4f}"
# ------------------- STRINGS -----------------------
if "string" in signal._ophyd_labels_ or "position" in signal._ophyd_labels_:
# For other strings, just echo the value
return f"{value}"
# ------------------- SWITCH (ACTIVE/INACTIVE) ------
if "switch" in signal._ophyd_labels_ and is_bool_like(value):
return f"{green+'ACTIVE'+white if value else red+'INACTIVE'+white}"
# ------------------- FAULT (OK/FAULT) --------------
if "fault" in signal._ophyd_labels_ and is_bool_like(value):
return f"{green+'OK'+white if not value else red+'FAULT'+white}"
# ------------------- VALVE/SHUTTER -----------------
if ("valve" in signal._ophyd_labels_ or "shutter" in signal._ophyd_labels_) and is_bool_like(value):
return f"{green+'OPEN'+white if value else red+'CLOSED'+white}"
# ------------------- FLOW (OK/FAIL) ----------------
if "flow" in signal._ophyd_labels_ and is_bool_like(value):
return f"{green}OK{white}" if bool(value) else f"{red}FAIL{white}"
# ------------------- FALLBACK -----------------------
return f"{value}"
# ------------------- PRINT START ---------------------
print(f"{bold}X12SA EPS status{white}")
for name, component in self._sig_attrs.items():
sub_device = getattr(self, name)
rows = []
# Only print sub-devices, not individual request signals
if not isinstance(sub_device, Device):
continue
print(f"\n{bold}{component.doc}{white}")
for sub_walk in sub_device.walk_components():
cpt: Cpt = sub_walk.item
it: EpicsSignalRO = getattr(sub_device, cpt.attr)
val = self.safe_get(it)
rows.append((cpt.doc, val, it))
label_width = max(32, *(len(label) for (label, _, _) in rows))
for label, value, it in rows:
fv = fmt_value(value, it) # <-- pass attr to formatter
print(f" - {label:<{label_width}} {fv}")
if sub_device.attr_name == "cooling_water":
v1 = self.safe_get(self.cooling_water.op_cs_ecvw_0010)
v2 = self.safe_get(self.cooling_water.op_cs_ecvw_0020)
def closed(v):
return is_bool_like(v) and not bool(v)
if closed(v1) and closed(v2):
print(f"\n{cyan}Hint:{white} Both water cooling valves are CLOSED.\n" f"You can open them using: {bold}dev.x12saEPS.water_cooling_op(){white}")
# fmt: on
# ----------------------------------------------------------
# Consistency report
# ----------------------------------------------------------
# def consistency_report(self, *, verbose=True):
# missing = []
# dupes = []
# seen = {}
# for sub_device in self.walk_components():
# section = sub_device.name
# for walk in sub_device.walk_components():
# cpt: Cpt = walk.ancestors[-1]
# it: EpicsSignalRO = walk.item
# if not hasattr(self, it["attr"]):
# missing.append((section, it["attr"], it["label"], it["pv"]))
# pv = it["pv"]
# if pv in seen:
# dupes.append((pv, seen[pv], (section, it["attr"], it["label"])))
# else:
# seen[pv] = (section, it["attr"], it["label"])
# if verbose:
# print("=== Consistency Report ===")
# if missing:
# print("\nMissing attributes:")
# for sec, a, lbl, pv in missing:
# print(f" - [{sec}] {a} {lbl} pv={pv}")
# else:
# print("\nNo missing attributes.")
# if dupes:
# print("\nDuplicate PVs:")
# for pv, f1, f2 in dupes:
# print(f" {pv} → {f1} AND {f2}")
# else:
# print("\nNo duplicate PVs.")
# return {"missing_attrs": missing, "duplicate_pvs": dupes}

View File

@@ -1,67 +0,0 @@
"""
Shutter device for the cSAXS beamline with 2 PVs. One is connected to a
signal can be set to control the shutter signal, and the other is a readback signal
that can be monitored to check the shutter status as it may be controlled directly by
the delay generator."""
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind
class cSAXSFastEpicsShutter(Device):
"""
Fast EPICS shutter with automatic PV selection based on host subnet. IOC prefix is 'X12SA-ES1-TTL:'
"""
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "fshstatus_readback", "help"]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
# PVs
shutter = Cpt(EpicsSignal, "OUT_01", kind=Kind.normal, auto_monitor=True)
shutter_readback = Cpt(EpicsSignalRO, "INP_01", kind=Kind.normal, auto_monitor=True)
# -----------------------------------------------------
# User-facing shutter control functions
# -----------------------------------------------------
# pylint: disable=protetced-access
def fshopen(self) -> None:
"""Open the fast shutter."""
self.shutter.set(1).wait(timeout=self.shutter._timeout) # 2s default for ES
# pylint: disable=protetced-access
def fshclose(self) -> None:
"""Close the fast shutter."""
self.shutter.set(0).wait(timeout=self.shutter._timeout) # 2s default for ES
def fshstatus(self) -> int:
"""Return the fast shutter control status (0=closed, 1=open)."""
return self.shutter.get() # Ensure we have the latest value from EPICS
def fshstatus_readback(self) -> int:
"""Return the fast shutter status (0=closed, 1=open)."""
return self.shutter_readback.get() # Ensure we have the latest value from EPICS
def fshinfo(self) -> None:
"""Print information about which EPICS PV channel is being used."""
pvname = self.shutter.pvname
shutter_readback_pvname = self.shutter_readback.pvname
print(
f"Fast shutter connected to EPICS channel: {pvname} with shutter readback: {shutter_readback_pvname}"
)
def stop(self, *, success: bool = False) -> None:
"""Stop the shutter device. Make sure to close it."""
self.shutter.put(0)
super().stop(success=success)
def help(self):
"""Display available user methods."""
print("Available methods:")
for method in self.USER_ACCESS:
print(f" - {method}")
if __name__ == "__main__":
fsh = cSAXSFastEpicsShutter(name="fsh", prefix="X12SA-ES1-TTL:")

View File

@@ -22,13 +22,7 @@ import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import (
AsyncMultiSignal,
CompareStatus,
ProgressSignal,
StatusBase,
TransitionStatus,
)
from ophyd_devices import AsyncMultiSignal, CompareStatus, ProgressSignal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.mcs_card.mcs_card import (
@@ -261,7 +255,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
"""
with self._rlock:
logger.info(f"Received update on mcs card {self.name}")
if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels
self._mca_counter_index += 1
@@ -293,7 +286,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
)
# Once we have received all channels, push data to BEC and reset for next accumulation
logger.info(
logger.debug(
f"Received update for {attr_name}, index {self._mca_counter_index}/{self.NUM_MCA_CHANNELS}"
)
if len(self._current_data) == self.NUM_MCA_CHANNELS:
@@ -317,14 +310,10 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
old_value: Previous value of the signal.
value: New value of the signal.
"""
try:
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
except Exception:
content = traceback.format_exc()
logger.info(f"Device {self.name} error: {content}")
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
def on_stage(self) -> None:
"""
@@ -374,10 +363,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._num_total_triggers = triggers * num_points
self._acquisition_group = "monitored" if triggers == 1 else "burst_group"
self.preset_real.set(0).wait(timeout=self._pv_timeout)
if self.scan_info.msg.scan_type == "step":
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
elif self.scan_info.msg.scan_type == "fly":
self.num_use_all.set(self._num_total_triggers).wait(timeout=self._pv_timeout)
self.num_use_all.set(triggers).wait(timeout=self._pv_timeout)
# Clear any previous data, just to be sure
with self._rlock:
@@ -399,21 +385,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._omit_mca_callbacks.clear()
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
# For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly":
self.erase_start.put(1)
def on_prescan(self) -> None | StatusBase:
"""
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure
that the card is properly started for fly scans. For step scans, this will be handled by the DDG,
so no action is required here.
"""
if self.scan_info.msg.scan_type == "fly":
status_acquiring = CompareStatus(self.acquiring, ACQUIRING.ACQUIRING)
self.cancel_on_stop(status_acquiring)
return status_acquiring
return None
def on_unstage(self) -> None:
"""
@@ -451,16 +422,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
hasattr(self.scan_info.msg, "num_points")
and self.scan_info.msg.num_points is not None
):
if self.scan_info.msg.scan_type == "step":
if self._current_data_index == self.scan_info.msg.num_points:
for callback in self._scan_done_callbacks:
callback(exception=None)
else:
logger.info(f"Current data index is {self._current_data_index}")
if self._current_data_index >= 1:
for callback in self._scan_done_callbacks:
callback(exception=None)
if self._current_data_index == self.scan_info.msg.num_points:
for callback in self._scan_done_callbacks:
callback(exception=None)
time.sleep(0.02) # 20ms delay to avoid busy loop
except Exception as exc: # pylint: disable=broad-except
content = traceback.format_exc()
@@ -488,6 +452,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
"""Callback for status failure, the monitoring thread should be stopped."""
# NOTE Check for status.done and status.success is important to avoid
if status.done:
self._start_monitor_async_data_emission.clear() # Stop monitoring
def on_complete(self) -> CompareStatus:
@@ -513,13 +478,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
monitoring thread is stopped properly.
"""
# NOTE For fly scans with EXT/EN enabled triggering, the MCS card needs to receive an
# additional trigger at the end of the scan to advance the channel. This will ensure
# that the acquisition finishes on the card and that data is emitted to BEC. If the acquisition
# was already finished (i.e. normal step scan sends 1 extra pulse per burst cycle), this will
# not have any effect as the card will already be in DONE state and signal.
self.software_channel_advance.put(1)
# Prepare and register status callback for the async monitoring loop
status_async_data = StatusBase(obj=self)
self._scan_done_callbacks.append(partial(self._status_callback, status_async_data))
@@ -533,7 +491,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# Combine both statuses
ret_status = status & status_async_data
# NOTE: Handle external stop/cancel, and stop monitoring
# Handle external stop/cancel, and stop monitoring
ret_status.add_callback(self._status_failed_callback)
self.cancel_on_stop(ret_status)
return ret_status

View File

@@ -156,7 +156,6 @@ class Camera:
camera_id (int): The ID of the camera device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
def __init__(

View File

@@ -3,21 +3,19 @@
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
from typing import TYPE_CHECKING, Literal
import numpy as np
from ophyd import Component as Cpt, Signal, Kind
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
@@ -45,8 +43,15 @@ class IDSCamera(PSIDeviceBase):
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
USER_ACCESS = ["start_live_mode", "stop_live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__(
self,
@@ -83,15 +88,22 @@ class IDSCamera(PSIDeviceBase):
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose
self._force_monochrome = force_monochrome
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
self.live_mode_enabled.put(bool(live_mode))
############## Live Mode Methods ##############
def start_live_mode(self) -> None:
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
@@ -114,22 +126,15 @@ class IDSCamera(PSIDeviceBase):
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def _on_live_mode_enabled_changed(self, *args, value, **kwargs):
"""Callback for when live mode is changed."""
enabled = bool(value)
if enabled and self.cam._connected is False: # pylint: disable=protected-access
self.cam.on_connect()
if enabled:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
@@ -196,7 +201,7 @@ class IDSCamera(PSIDeviceBase):
"""Connect to the camera."""
self.cam.force_monochrome = self._force_monochrome
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.live_mode_enabled.put(bool(self._inputs.get("live_mode", False)))
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
@@ -206,7 +211,7 @@ class IDSCamera(PSIDeviceBase):
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
if not bool(self.live_mode_enabled.get()):
return
image = self.image.get()
if image is not None:

View File

@@ -1,48 +0,0 @@
# Overview
Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend.
There are currently two supported Eiger detectors:
- EIGER 1.5M
- EIGER 9M
This module provides a base integration for both detectors. A short list of useful
information is also provided below.
## JungfrauJoch Service
The JungfrauJoch WEB UI is available on http://sls-jfjoch-001:8080. This is an interface
to the broker which runs on sls-jfjoch-001.psi.ch. The writer service runs on
xbl-daq-34.psi.ch. Permissions to get access to these machines and run systemctl or
journalctl commands can be requested with the Infrastructure and Services group in AWI.
Beamline scientists need to check if they have the necessary permissions to connect
to these machines and run the commands below.
Useful commands for the broker service on sls-jfjoch-001.psi.ch:
- sudo systemctl status jfjoch_broker # Check status
- sudo systemctl start jfjoch_broker # Start service
- sudo systemctl stop jfjoch_broker # Stop service
- sudo systemctl restart jfjoch_broker # Restart service
For the writer service on xbl-daq-34.psi.ch:
- sudo journalctl -u jfjoch_writer -f # streams live logs
- sudo systemctl status jfjoch_writer # Check status
- sudo systemctl start jfjoch_writer # Start service
- sudo systemctl stop jfjoch_writer # Stop service
- sudo systemctl restart jfjoch_writer # Restart service
More information about the JungfrauJoch and API client can be found at: (https://jungfraujoch.readthedocs.io/en/latest/index.html)
### JungfrauJoch API Client
A thin wrapper for the JungfrauJoch API client is provided in the [jungfrau_joch_client](./jungfrau_joch_client.py).
Details about the specific integration are provided in the code.
## Eiger implementation
The Eiger detector integration is provided in the [eiger.py](./eiger.py) module. It provides a base integration for both Eiger 1.5M and Eiger 9M detectors.
Logic specific to each detector is implemented in the respective modules:
- [eiger_1_5m.py](./eiger_1_5m.py)
- [eiger_9m.py](./eiger_9m.py)
With the current implementation, the detector initialization should be done by a beamline scientist through the JungfrauJoch WEB UI by choosing the
appropriate detector (1.5M or 9M) before loading the device config with BEC. BEC will check upon connecting if the selected detector matches the expected one.
A preview stream for images is also provided which is forwarded and accessible through the `preview_image` signal.
For more specific details, please check the code documentation.

View File

@@ -1,23 +1,34 @@
"""
Generic integration of JungfrauJoch backend with Eiger detectors
for the cSAXS beamline at the Swiss Light Source.
Integration module for Eiger detectors at the cSAXS beamline with JungfrauJoch backend.
The WEB UI is available on http://sls-jfjoch-001:8080
A few notes on setup and operation of the Eiger detectors through the JungfrauJoch broker:
NOTE: this may not be the best place to store this information. It should be migrated to
beamline documentation for debugging of Eiger & JungfrauJoch.
The JungfrauJoch server for cSAXS runs on sls-jfjoch-001.psi.ch
User with sufficient rights may use:
- sudo systemctl restart jfjoch_broker
- sudo systemctl status jfjoch_broker
to check and/or restart the broker for the JungfrauJoch server.
Some extra notes for setting up the detector:
- If the energy on JFJ is set via DetectorSettings, the variable in DatasetSettings will be ignored
- Changes in energy may take time, good to implement logic that only resets energy if needed.
- For the Eiger, the frame_time_us in DetectorSettings is ignored, only the frame_time_us in
the DatasetSettings is relevant
- The bit_depth will be adjusted automatically based on the exp_time. Here, we need to ensure
that subsequent triggers properly consider the readout_time of the boards. For the Eiger detectors
at cSAXS, a readout time of 20us is configured through the JungfrauJoch deployment config. This
setting is sufficiently large for the detectors if they run in parallel mode.
that subsequent triggers properly
consider the readout_time of the boards. For Jungfrau detectors, the difference between
count_time_us and frame_time_us is the readout_time of the boards. For the Eiger, this needs
to be taken into account during the integration.
- beam_center and detector settings are required input arguments, thus, they may be set to wrong
values for acquisitions to start. Please keep this in mind.
Hardware related notes:
- If there is an HW issue with the detector, power cycling may help.
- The sls_detector package is available on console on /sls/x12sa/applications/erik/micromamba
- The sls_detector package is available on console on /sls/X12SA/data/gac-x12sa/erik/micromamba
- Run: source setup_9m.sh # Be careful, this connects to the detector, so it should not be
used during operation
- Useful commands:
@@ -28,6 +39,9 @@ Hardware related notes:
- cd power_control_user/
- ./on
- ./off
Further information that may be relevant for debugging:
JungfrauJoch - one needs to connect to the jfj-server (sls-jfjoch-001)
"""
from __future__ import annotations
@@ -70,19 +84,10 @@ class EigerError(Exception):
class Eiger(PSIDeviceBase):
"""
Base integration of the Eiger1.5M and Eiger9M at cSAXS.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
Base integration of the Eiger1.5M and Eiger9M at cSAXS. All relevant
"""
USER_ACCESS = ["set_detector_distance", "set_beam_center"]
USER_ACCESS = ["detector_distance", "beam_center"]
file_event = Cpt(FileEventSignal, name="file_event")
preview_image = Cpt(PreviewSignal, name="preview_image", ndim=2)
@@ -100,12 +105,23 @@ class Eiger(PSIDeviceBase):
device_manager=None,
**kwargs,
):
"""
Initialize the PSI Device Base class.
Args:
name (str) : Name of the device
detector_name (str): Name of the detector. Supports ["EIGER 9M", "EIGER 8.5M (tmp)", "EIGER 1.5M"]
host (str): Hostname of the Jungfrau Joch server.
port (int): Port of the Jungfrau Joch server.
scan_info (ScanInfo): The scan info to use.
device_manager (DeviceManagerDS): The device manager to use.
**kwargs: Additional keyword arguments.
"""
super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs)
self._host = f"{host}:{port}"
self.jfj_client = JungfrauJochClient(host=self._host, parent=self)
# NOTE: fetch this information from JungfrauJochClient during on_connected!
self.jfj_preview_client = JungfrauJochPreview(
url="tcp://129.129.95.114:5400", cb=self._preview_callback
url="tcp://129.129.95.114:5400", cb=self.preview_image.put
) # IP of sls-jfjoch-001.psi.ch on port 5400 for ZMQ stream
self.device_manager = device_manager
self.detector_name = detector_name
@@ -113,102 +129,53 @@ class Eiger(PSIDeviceBase):
self._beam_center = beam_center
self._readout_time = readout_time
self._full_path = ""
self._num_triggers = 0
self._wait_for_on_complete = 20 # seconds
if self.device_manager is not None:
self.device_manager: DeviceManagerDS
def _preview_callback(self, message: dict) -> None:
"""
Callback method for handling preview messages as received from the JungfrauJoch preview stream.
These messages are dictionary dumps as described in the JFJ ZMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
message (dict): The message received from the preview stream.
"""
if message.get("type", "") == "image":
data = message.get("data", {}).get("default", None)
if data is None:
logger.error(f"Received image message on device {self.name} without data.")
return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
# pylint: disable=missing-function-docstring
@property
def detector_distance(self) -> float:
"""The detector distance in mm."""
return self._detector_distance
@detector_distance.setter
def detector_distance(self, value: float) -> None:
"""Set the detector distance in mm."""
if value <= 0:
raise ValueError("Detector distance must be a positive value.")
self._detector_distance = value
def set_detector_distance(self, distance: float) -> None:
"""
Set the detector distance in mm.
Args:
distance (float): The detector distance in mm.
"""
self.detector_distance = distance
# pylint: disable=missing-function-docstring
@property
def beam_center(self) -> tuple[float, float]:
"""The beam center in pixels. (x,y)"""
return self._beam_center
@beam_center.setter
def beam_center(self, value: tuple[float, float]) -> None:
if any(coord < 0 for coord in value):
raise ValueError("Beam center coordinates must be non-negative.")
"""Set the beam center in pixels. (x,y)"""
self._beam_center = value
def set_beam_center(self, x: float, y: float) -> None:
"""
Set the beam center coordinates in pixels.
Args:
x (float): The x coordinate of the beam center in pixels.
y (float): The y coordinate of the beam center in pixels.
"""
self.beam_center = (x, y)
def on_init(self) -> None:
"""Hook called during device initialization."""
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 10) -> None:
"""
Wait for the device to be connected to the JungfrauJoch backend.
Called when the device is initialized.
Args:
timeout (float): Timeout in seconds to wait for the connection.
No siganls are connected at this point,
thus should not be set here but in on_connected instead.
"""
self.jfj_client.api.status_get(_request_timeout=timeout) # If connected, this responds
def on_connected(self) -> None:
"""
Hook called after the device is connected to through the device server.
Called after the device is connected and its signals are connected.
Default values for signals should be set here. Currently, the detector needs to be
initialised manually through the WEB UI of JungfrauJoch. Once agreed upon, the automated
initialisation can be re-enabled here (code commented below).
Default values for signals should be set here.
"""
start_time = time.time()
logger.debug(f"On connected called for {self.name}")
self.jfj_client.stop(request_timeout=3)
# Check which detector is selected
# Get available detectors
available_detectors = self.jfj_client.api.config_select_detector_get(_request_timeout=5)
logger.debug(f"Available detectors {available_detectors}")
# Get current detector
current_detector_name = ""
if available_detectors.current_id is not None:
if available_detectors.current_id:
detector_selection = [
det.description
for det in available_detectors.detectors
@@ -223,9 +190,8 @@ class Eiger(PSIDeviceBase):
raise RuntimeError(
f"Detector {self.detector_name} is not in IDLE state, current state: {self.jfj_client.detector_state}. Please initialize the detector in the WEB UI: {self._host}."
)
# TODO - Currently the initialisation of the detector is done manually through the WEB UI. Once adjusted
# this can be automated here again.
# TODO - check again once Eiger should be initialized automatically, currently human initialization is expected
# # Once the automation should be enabled, we may use here
# detector_selection = [
# det for det in available_detectors.detectors if det.id == self.detector_name
# ]
@@ -241,51 +207,41 @@ class Eiger(PSIDeviceBase):
# Setup Detector settings, here we may also set the energy already as this might be time consuming
settings = DetectorSettings(frame_time_us=int(500), timing=DetectorTiming.TRIGGER)
self.jfj_client.set_detector_settings(settings, timeout=5)
self.jfj_client.set_detector_settings(settings, timeout=10)
# Set the file writer to the appropriate output for the HDF5 file
file_writer_settings = FileWriterSettings(overwrite=True, format=FileWriterFormat.NXMXVDS)
logger.debug(
f"Setting writer_settings: {yaml.dump(file_writer_settings.to_dict(), indent=4)}"
)
# Setup the file writer settings
self.jfj_client.api.config_file_writer_put(
file_writer_settings=file_writer_settings, _request_timeout=10
)
# Start the preview client
self.jfj_preview_client.connect()
self.jfj_preview_client.start()
logger.info(
f"Device {self.name} initialized after {time.time()-start_time:.2f}s. Preview stream connected on url: {self.jfj_preview_client.url}"
)
logger.info(f"Connected to JungfrauJoch preview stream at {self.jfj_preview_client.url}")
def on_stage(self) -> DeviceStatus | None:
"""
Hook called when staging the device. Information about the upcoming scan can be accessed from the scan_info object.
scan_msg = self.scan_info.msg
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info object.
"""
start_time = time.time()
scan_msg = self.scan_info.msg
# TODO: Check mono energy from device in BEC
# Setting incident energy in keV
# Set acquisition parameter
# TODO add check of mono energy, this can then also be passed to DatasetSettings
incident_energy = 12.0
# Setting up exp_time and num_triggers acquisition parameter
exp_time = scan_msg.scan_parameters.get("exp_time", 0)
if exp_time <= self._readout_time: # Exp_time must be at least the readout time
if exp_time <= self._readout_time:
raise ValueError(
f"Value error on device {self.name}: Exposure time {exp_time}s is less than readout time {self._readout_time}s."
f"Receive scan request for scan {scan_msg.scan_name} with exp_time {exp_time}s, which must be larger than the readout time {self._readout_time}s of the detector {self.detector_name}."
)
self._num_triggers = int(
scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]
)
# Setting up the full path for file writing
frame_time_us = exp_time #
ntrigger = int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"])
# Fetch file path
self._full_path = get_full_path(scan_msg, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
self.file_event.put(
file_path=self._full_path,
@@ -293,14 +249,11 @@ class Eiger(PSIDeviceBase):
successful=False,
hinted_h5_entries={"data": "entry/data/data"},
)
# JFJ adds _master.h5 automatically
path = os.path.relpath(self._full_path, start="/sls/x12sa/data").removesuffix("_master.h5")
# Create dataset settings for API call.
data_settings = DatasetSettings(
image_time_us=int(exp_time * 1e6),
ntrigger=self._num_triggers,
image_time_us=int(frame_time_us * 1e6), # This is currently ignored
ntrigger=ntrigger,
file_prefix=path,
beam_x_pxl=int(self._beam_center[0]),
beam_y_pxl=int(self._beam_center[1]),
@@ -308,15 +261,11 @@ class Eiger(PSIDeviceBase):
incident_energy_ke_v=incident_energy,
)
logger.debug(f"Setting data_settings: {yaml.dump(data_settings.to_dict(), indent=4)}")
prep_time = time.time()
self.jfj_client.wait_for_idle(timeout=10) # Ensure we are in IDLE state
prep_time = start_time - time.time()
logger.debug(f"Prepared information for eiger to start acquisition in {prep_time:.2f}s")
self.jfj_client.wait_for_idle(timeout=10, request_timeout=10) # Ensure we are in IDLE state
self.jfj_client.start(settings=data_settings) # Takes around ~0.6s
# Time the stage process
logger.info(
f"Device {self.name} staged for scan. Time spent {time.time()-start_time:.2f}s,"
f" with {time.time()-prep_time:.2f}s spent with communication to JungfrauJoch."
)
logger.debug(f"Wait for IDLE and start call took {time.time()-start_time-prep_time:.2f}s")
def on_unstage(self) -> DeviceStatus:
"""Called while unstaging the device."""
@@ -329,9 +278,7 @@ class Eiger(PSIDeviceBase):
def _file_event_callback(self, status: DeviceStatus) -> None:
"""Callback to update the file_event signal when the acquisition is done."""
logger.debug(
f"File event callback on complete status for device {self.name}: done={status.done}, successful={status.success}"
)
logger.info(f"Acquisition done callback called for {self.name} for status {status.success}")
self.file_event.put(
file_path=self._full_path,
done=status.done,
@@ -340,44 +287,19 @@ class Eiger(PSIDeviceBase):
)
def on_complete(self) -> DeviceStatus:
"""
Called at the end of the scan. The method should implement an asynchronous wait for the
device to complete the acquisition. A callback to update the file_event signal is
attached that resolves the file event when the acquisition is done.
Returns:
DeviceStatus: The status object representing the completion of the acquisition.
"""
"""Called to inquire if a device has completed a scans."""
def wait_for_complete():
start_time = time.time()
# NOTE: This adjust the time (s) that should be waited for completion of the scan.
timeout = self._wait_for_on_complete
while time.time() - start_time < timeout:
if self.jfj_client.wait_for_idle(timeout=1, raise_on_timeout=False):
# TODO: Once available, add check for
statistics: MeasurementStatistics = (
self.jfj_client.api.statistics_data_collection_get(_request_timeout=5)
)
if statistics.images_collected < self._num_triggers:
raise EigerError(
f"Device {self.name} acquisition incomplete. "
f"Expected {self._num_triggers} triggers, "
f"but only {statistics.images_collected} were collected."
)
timeout = 10
for _ in range(timeout):
if self.jfj_client.wait_for_idle(timeout=1, request_timeout=10):
return
logger.info(
f"Waiting for device {self.name} to finish complete, time elapsed: "
f"{time.time() - start_time}."
)
statistics: MeasurementStatistics = self.jfj_client.api.statistics_data_collection_get(
_request_timeout=5
)
broker_status = self.jfj_client.jfj_status
raise TimeoutError(
f"Timeout after waiting for device {self.name} to complete for {time.time()-start_time:.2f}s \n \n"
f"Broker status: \n{yaml.dump(broker_status.to_dict(), indent=4)} \n \n"
f"Measurement statistics: \n{yaml.dump(statistics.to_dict(), indent=4)}"
f"Timeout after waiting for detector {self.name} to complete for {time.time()-start_time:.2f}s, measurement statistics: {yaml.dump(statistics.to_dict(), indent=4)}"
)
status = self.task_handler.submit_task(wait_for_complete, run=True)
@@ -390,11 +312,7 @@ class Eiger(PSIDeviceBase):
def on_stop(self) -> None:
"""Called when the device is stopped."""
self.jfj_client.stop(request_timeout=0.5)
self.jfj_client.stop(
request_timeout=0.5
) # Call should not block more than 0.5 seconds to stop all devices...
self.task_handler.shutdown()
def on_destroy(self):
"""Called when the device is destroyed."""
self.jfj_preview_client.stop()
self.on_stop()
return super().on_destroy()

View File

@@ -21,18 +21,18 @@ if TYPE_CHECKING: # pragma no cover
from bec_server.device_server.device_server import DeviceManagerDS
EIGER9M_READOUT_TIME_US = 500e-6 # 500 microseconds in s
DETECTOR_NAME = "EIGER 9M" # "EIGER 9M""
DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M""
# pylint:disable=invalid-name
class Eiger9M(Eiger):
"""
EIGER 9M specific integration for the in-vaccum Eiger.
Eiger 1.5M specific integration for the in-vaccum Eiger.
The logic implemented here is coupled to the DelayGenerator integration,
repsonsible for the global triggering of all devices through a single Trigger logic.
Please check the eiger.py class for more details about the integration of relevant backend
services. The detector_name must be set to "EIGER 9M":
services. The detector_name must be set to "EIGER 1.5M:
"""
USER_ACCESS = Eiger.USER_ACCESS + [] # Add more user_access methods here.

View File

@@ -1,15 +1,13 @@
"""Module with a thin client wrapper around the Jungfrau Joch detector API"""
"""Module with client interface for the Jungfrau Joch detector API"""
from __future__ import annotations
import enum
import threading
import time
import traceback
from typing import TYPE_CHECKING
import requests
import yaml
from bec_lib.logger import bec_logger
from jfjoch_client.api.default_api import DefaultApi
from jfjoch_client.api_client import ApiClient
@@ -20,7 +18,7 @@ from jfjoch_client.models.detector_settings import DetectorSettings
logger = bec_logger.logger
if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING:
from ophyd import Device
@@ -31,10 +29,7 @@ class JungfrauJochClientError(Exception):
class DetectorState(str, enum.Enum):
"""
Enum states of the BrokerStatus state. The pydantic model validates in runtime,
thus we keep the possible states here for a convenient overview and access.
"""
"""Possible Detector states for Jungfrau Joch detector"""
INACTIVE = "Inactive"
IDLE = "Idle"
@@ -45,15 +40,13 @@ class DetectorState(str, enum.Enum):
class JungfrauJochClient:
"""
Jungfrau Joch API client wrapper. It provides a thin wrapper methods around the API client,
that allow to connect, initialise, wait for state changes, set settings, start and stop
acquisitions.
"""Thin wrapper around the Jungfrau Joch API client.
Args:
host (str): Hostname of the Jungfrau Joch broker service.
Default is "http://sls-jfjoch-001:8080"
parent (Device, optional): Parent ophyd device, used for logging purposes.
sudo systemctl restart jfjoch_broker
sudo systemctl status jfjoch_broker
It looks as if the detector is not being stopped properly.
One module remains running, how can we restart the detector?
"""
def __init__(
@@ -66,63 +59,50 @@ class JungfrauJochClient:
self._parent_name = parent.name if parent else self.__class__.__name__
@property
def jfj_status(self) -> BrokerStatus:
"""Broker status of JungfrauJoch."""
def jjf_state(self) -> BrokerStatus:
"""Get the status of JungfrauJoch"""
response = self.api.status_get()
return BrokerStatus(**response.to_dict())
# pylint: disable=missing-function-docstring
@property
def initialised(self) -> bool:
"""Check if jfj is connected and ready to receive commands"""
return self._initialised
@initialised.setter
def initialised(self, value: bool) -> None:
"""Set the connected status"""
self._initialised = value
# pylint: disable=missing-function-docstring
# TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync...
# REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here..
@property
def detector_state(self) -> DetectorState:
return DetectorState(self.jfj_status.state)
"""Get the status of JungfrauJoch"""
return DetectorState(self.jjf_state.state)
def connect_and_initialise(self, timeout: int = 10) -> None:
"""
Connect and initialise the JungfrauJoch detector. The detector must be in
IDLE state to become initialised. This is a blocking call, the timeout parameter
will be passed to the HTTP requests timeout method of the wait_for_idle method.
Args:
timeout (int): Timeout in seconds for the initialisation and waiting for IDLE state.
"""
def connect_and_initialise(self, timeout: int = 10, **kwargs) -> None:
"""Check if JungfrauJoch is connected and ready to receive commands"""
status = self.detector_state
# TODO: #135 Check if the detector has to be in INACTIVE state before initialisation
if status != DetectorState.IDLE:
self.api.initialize_post()
self.wait_for_idle(timeout)
self.api.initialize_post() # This is a blocking call....
self.wait_for_idle(timeout, request_timeout=timeout) # Blocking call
self.initialised = True
def set_detector_settings(self, settings: dict | DetectorSettings, timeout: int = 10) -> None:
"""
Set the detector settings. The state of JungfrauJoch must be in IDLE,
Error or Inactive state. Please note: a full set of setttings has to be provided,
otherwise the settings will be overwritten with default values.
"""Set the detector settings. JungfrauJoch must be in IDLE, Error or Inactive state.
Note, the full settings have to be provided, otherwise the settings will be overwritten with default values.
Args:
settings (dict): dictionary of settings
timeout (int): Timeout in seconds for the HTTP request to set the settings.
"""
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
logger.info(
f"JungfrauJoch backend fo device {self._parent_name} is not in IDLE state,"
" waiting 1s before retrying..."
)
time.sleep(1) # Give the detector 1s to become IDLE, retry
state = self.detector_state
if state not in [DetectorState.IDLE, DetectorState.ERROR, DetectorState.INACTIVE]:
raise JungfrauJochClientError(
f"Error on {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE"
" state to set settings. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE, ERROR or INACTIVE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
@@ -130,36 +110,28 @@ class JungfrauJochClient:
try:
self.api.config_detector_put(detector_settings=settings, _request_timeout=timeout)
except requests.exceptions.Timeout:
raise TimeoutError(
f"Timeout on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}."
)
raise TimeoutError(f"Timeout while setting detector settings for {self._parent_name}")
except Exception:
content = traceback.format_exc()
logger.error(
f"Error on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}. Error traceback: {content}"
)
raise JungfrauJochClientError(
f"Error on device {self._parent_name} while setting detector settings:\n "
f"{yaml.dump(settings, indent=4)}. Full traceback: {content}."
f"Error while setting detector settings for {self._parent_name}: {content}"
)
def start(self, settings: dict | DatasetSettings, request_timeout: float = 10) -> None:
"""
Start the acquisition with the provided dataset settings.
The detector must be in IDLE state. Settings must always provide a full set of
parameters, missing parameters will be set to default values.
"""Start the mesaurement. DatasetSettings must be provided, and JungfrauJoch must be in IDLE state.
The method call is blocking and JungfrauJoch will be ready to measure after the call resolves.
Args:
settings (dict | DatasetSettings): Dataset settings to start the acquisition with.
request_timeout (float): Timeout in sec for the HTTP request to start the acquisition.
settings (dict): dictionary of settings
Please check the DataSettings class for the available settings. Minimum required settings are
beam_x_pxl, beam_y_pxl, detector_distance_mm, incident_energy_keV.
"""
state = self.detector_state
if state != DetectorState.IDLE:
raise JungfrauJochClientError(
f"Error on device {self._parent_name}. "
f"Detector must be in IDLE state to start acquisition. Current state: {state}"
f"Error in {self._parent_name}. Detector must be in IDLE state to set settings. Current state: {state}"
)
if isinstance(settings, dict):
@@ -169,80 +141,46 @@ class JungfrauJochClient:
dataset_settings=settings, _request_timeout=request_timeout
)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}. \n"
f"Traceback: {content}"
)
raise TimeoutError(
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during 'start' call with dataset settings: {yaml.dump(settings, indent=4)}."
f"TimeoutError in JungfrauJochClient for parent device {self._parent_name} for 'start' call"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error on device {self._parent_name} during 'start' post with dataset settings: \n"
f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}"
)
raise JungfrauJochClientError(
f"Error on device {self._parent_name} during 'start' post with dataset settings: \n"
f"{yaml.dump(settings, indent=4)}. \nTraceback: {content}."
f"Error in JungfrauJochClient for parent device {self._parent_name} during 'start' call: {content}"
)
def stop(self, request_timeout: float = 0.5) -> None:
"""Stop the acquisition, this only logs errors and is not raising."""
try:
self.api.cancel_post_with_http_info(_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(
f"Error in JungFrauJochClient for device {self._parent_name} during stop: {content}"
)
def _stop_call(self):
try:
self.api.cancel_post_with_http_info() # (_request_timeout=request_timeout)
except requests.exceptions.Timeout:
content = traceback.format_exc()
logger.error(
f"Timeout error after {request_timeout} seconds on device {self._parent_name} "
f"during stop: {content}"
)
except Exception:
content = traceback.format_exc()
logger.error(f"Error on device {self._parent_name} during stop: {content}")
thread = threading.Thread(
target=_stop_call, daemon=True, args=(self,), name="stop_jungfraujoch_thread"
)
thread.start()
def wait_for_idle(self, timeout: int = 10, raise_on_timeout: bool = True) -> bool:
"""
Method to wait until the detector is in IDLE state. This is a blocking call with a
timeout that can be specified. The additional parameter raise_on_timeout can be used to
raise an exception on timeout instead of returning boolean True/False.
def wait_for_idle(self, timeout: int = 10, request_timeout: float | None = None) -> bool:
"""Wait for JungfrauJoch to be in Idle state. Blocking call with timeout.
Args:
timeout (int): timeout in seconds
raise_on_timeout (bool): If True, raises an exception on timeout. Default is True.
Returns:
bool: True if the detector is in IDLE state, False if timeout occurred
"""
if request_timeout is None:
request_timeout = timeout
try:
self.api.wait_till_done_post(timeout=timeout, _request_timeout=timeout)
self.api.wait_till_done_post(timeout=timeout, _request_timeout=request_timeout)
except requests.exceptions.Timeout:
raise TimeoutError(f"HTTP request timeout in wait_for_idle for {self._parent_name}")
except Exception:
content = traceback.format_exc()
logger.info(
f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle: {content}"
)
if raise_on_timeout:
raise TimeoutError(
f"Timeout after {timeout} seconds on device {self._parent_name} in wait_for_idle."
)
return False
except Exception as exc:
content = traceback.format_exc()
logger.info(
f"Error on device {self._parent_name} in wait_for_idle. Full traceback: {content}"
)
if raise_on_timeout:
raise JungfrauJochClientError(
f"Error on device {self._parent_name} in wait_for_idle: {content}"
) from exc
logger.debug(f"Waiting for device {self._parent_name} to become IDLE: {content}")
return False
return True

View File

@@ -1,136 +1,22 @@
"""
Module for the JungfrauJoch preview ZMQ stream for the Eiger detector at cSAXS.
The Preview client is implemented for the JungfrauJoch ZMQ PUB-SUB interface, and
should be independent of the EIGER detector type.
The client connects to the ZMQ PUB-SUB preview stream and calls a user provided callback
function with the decompressed messages received from the stream. The callback needs to be
able to deal with the different message types sent by the JungfrauJoch server ("start",
"image", "end") as described in the JungfrauJoch ZEROMQ preview stream documentation.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
"""
"""Module for the Eiger preview ZMQ stream."""
from __future__ import annotations
import json
import threading
import time
from typing import Callable
import cbor2
import numpy as np
import zmq
from bec_lib.logger import bec_logger
from dectris.compression import decompress
logger = bec_logger.logger
###############################
###### CBOR TAG DECODERS ######
###############################
# Dectris specific CBOR tags and decoders for Jungfrau data
# Reference:
# https://github.com/dectris/documentation/blob/main/stream_v2/examples/client.py
def decode_multi_dim_array(tag: cbor2.CBORTag, column_major: bool = False):
"""Decode a multi-dimensional array from a CBOR tag."""
dimensions, contents = tag.value
if isinstance(contents, list):
array = np.empty((len(contents),), dtype=object)
array[:] = contents
elif isinstance(contents, (np.ndarray, np.generic)):
array = contents
else:
raise cbor2.CBORDecodeValueError("expected array or typed array")
return array.reshape(dimensions, order="F" if column_major else "C")
def decode_typed_array(tag: cbor2.CBORTag, dtype: str):
"""Decode a typed array from a CBOR tag."""
if not isinstance(tag.value, bytes):
raise cbor2.CBORDecodeValueError("expected byte string in typed array")
return np.frombuffer(tag.value, dtype=dtype)
def decode_dectris_compression(tag: cbor2.CBORTag):
"""Decode a Dectris compressed array from a CBOR tag."""
algorithm, elem_size, encoded = tag.value
return decompress(encoded, algorithm, elem_size=elem_size)
#########################################
#### Dectris CBOR TAG Extensions ########
#########################################
# Mapping of various additional CBOR tags from Dectris to decoder functions
tag_decoders = {
40: lambda tag: decode_multi_dim_array(tag, column_major=False),
64: lambda tag: decode_typed_array(tag, dtype="u1"),
65: lambda tag: decode_typed_array(tag, dtype=">u2"),
66: lambda tag: decode_typed_array(tag, dtype=">u4"),
67: lambda tag: decode_typed_array(tag, dtype=">u8"),
68: lambda tag: decode_typed_array(tag, dtype="u1"),
69: lambda tag: decode_typed_array(tag, dtype="<u2"),
70: lambda tag: decode_typed_array(tag, dtype="<u4"),
71: lambda tag: decode_typed_array(tag, dtype="<u8"),
72: lambda tag: decode_typed_array(tag, dtype="i1"),
73: lambda tag: decode_typed_array(tag, dtype=">i2"),
74: lambda tag: decode_typed_array(tag, dtype=">i4"),
75: lambda tag: decode_typed_array(tag, dtype=">i8"),
77: lambda tag: decode_typed_array(tag, dtype="<i2"),
78: lambda tag: decode_typed_array(tag, dtype="<i4"),
79: lambda tag: decode_typed_array(tag, dtype="<i8"),
80: lambda tag: decode_typed_array(tag, dtype=">f2"),
81: lambda tag: decode_typed_array(tag, dtype=">f4"),
82: lambda tag: decode_typed_array(tag, dtype=">f8"),
83: lambda tag: decode_typed_array(tag, dtype=">f16"),
84: lambda tag: decode_typed_array(tag, dtype="<f2"),
85: lambda tag: decode_typed_array(tag, dtype="<f4"),
86: lambda tag: decode_typed_array(tag, dtype="<f8"),
87: lambda tag: decode_typed_array(tag, dtype="<f16"),
1040: lambda tag: decode_multi_dim_array(tag, column_major=True),
56500: lambda tag: decode_dectris_compression(tag), # pylint: disable=unnecessary-lambda
}
def tag_hook(decoder, tag: int):
"""
Tag hook for the cbor2.loads method. Both arguments "decoder" and "tag" mus be present.
We use the tag to choose the respective decoder from the tag_decoders registry if available.
"""
tag_decoder = tag_decoders.get(tag.tag)
return tag_decoder(tag) if tag_decoder else tag
######################
#### ZMQ Settings ####
######################
ZMQ_TOPIC_FILTER = b"" # Subscribe to all topics
ZMQ_CONFLATE_SETTING = 1 # Keep only the most recent message
ZMQ_RCVHWM_SETTING = 1 # Set high water mark to 1, this configures the max number of queue messages
#################################
#### Jungfrau Preview Client ####
#################################
ZMQ_TOPIC_FILTER = b""
class JungfrauJochPreview:
"""
Preview client for the JungfrauJoch ZMQ preview stream. The client is started with
a URL to receive the data from the JungfrauJoch PUB-SUB preview interface, and a
callback function that is called with messages received from the preview stream.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
url (str): ZMQ PUB-SUB preview stream URL.
cb (Callable): Callback function called with messages received from the stream.
"""
USER_ACCESS = ["start", "stop"]
def __init__(self, url: str, cb: Callable):
@@ -141,18 +27,16 @@ class JungfrauJochPreview:
self._on_update_callback = cb
def connect(self):
"""
Connect to the JungfrauJoch PUB-SUB streaming interface. If the connection is refused
it will reattempt a second time after a one second delay.
"""Connect to the JungfrauJoch PUB-SUB streaming interface
JungfrauJoch may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.CONFLATE, ZMQ_CONFLATE_SETTING)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
self._socket.setsockopt(zmq.RCVHWM, ZMQ_RCVHWM_SETTING)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
@@ -160,26 +44,17 @@ class JungfrauJochPreview:
self._socket.connect(self.url)
def start(self):
"""Start the ZMQ update loop in a background thread."""
self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="JungfrauJoch_live_preview"
)
self._zmq_thread.start()
def stop(self):
"""Stop the ZMQ update loop and wait for the thread to finish."""
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join(timeout=1.0)
self._zmq_thread.join()
def _zmq_update_loop(self, poll_interval: float = 0.2):
"""
ZMQ update loop running in a background thread. The polling is throttled by
the poll_interval parameter.
Args:
poll_interval (float): Time in seconds to wait between polling attempts.
"""
def _zmq_update_loop(self):
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
@@ -189,21 +64,18 @@ class JungfrauJochPreview:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
logger.debug(
f"ZMQ Again exception, receive queue is empty for JFJ preview at {self.url}."
)
finally:
# We throttle the polling to avoid heavy load on the device server
time.sleep(poll_interval)
# Happens when receive queue is empty
time.sleep(0.1)
def _poll(self):
"""
Poll the ZMQ socket for new data. We are currently subscribing and unsubscribing
for each poll loop to avoid receiving too many messages. Throttling of the update
loop is handled in the _zmq_update_loop method.
Poll the ZMQ socket for new data. It will throttle the data update and
only subscribe to the topic for a single update. This is not very nice
but it seems like there is currently no option to set the update rate on
the backend.
"""
if self._shutdown_event.is_set():
if self._shutdown_event.wait(0.2):
return
try:
@@ -218,19 +90,7 @@ class JungfrauJochPreview:
# Unsubscribe from the topic
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, bytes_list: list[bytes]):
"""
Parse the received ZMQ data from the JungfrauJoch preview stream.
We will call the _on_update_callback with the decompressed messages as a dictionary.
The callback needs to be able to deal with the different message types sent
by the JungfrauJoch server ("start", "image", "end") as described in the
JungfrauJoch ZEROMQ preview stream documentation. Messages are dictionary dumps.
(https://jungfraujoch.readthedocs.io/en/latest/ZEROMQ_STREAM.html#preview-stream).
Args:
bytes_list (list[bytes]): List of byte messages received from ZMQ recv_multipart.
"""
for byte_msg in bytes_list:
msg = cbor2.loads(byte_msg, tag_hook=tag_hook)
self._on_update_callback(msg)
def _parse_data(self, data):
# TODO decode and parse the data
# self._on_update_callback(data)
pass

View File

@@ -498,9 +498,6 @@ class RtFlomniController(Controller):
)
# while scan is running
while mode > 0:
#TODO here?: scan abortion if no progress in scan *raise error
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
time.sleep(0.01)
@@ -632,8 +629,6 @@ class RtFlomniMotor(Device, PositionerBase):
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
connectionTimeout = 20
def __init__(
self,
axis_Id,

View File

@@ -1,87 +1,82 @@
"""
Fast Shutter control for OMNY setup. If started with a config file in which the device_manager
has a 'fsh' device (cSAXSFastEpicsShutter), this device will be used as the shutter.
Otherwise, the device will create a dummy shutter device that will log warnings when shutter
methods are called, but will not raise exceptions.
"""
from bec_lib.logger import bec_logger
import time
import socket
from ophyd import Component as Cpt
from ophyd import Device, Signal
from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
from ophyd import Device
from ophyd import EpicsSignal
class OMNYFastShutter(PSIDeviceBase, Device):
class OMNYFastEpicsShutterError(Exception):
pass
def _detect_host_pv():
"""Detect host subnet and return appropriate PV name."""
try:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if local_ip.startswith("129.129.122."):
return "X12SA-ES1-TTL:OUT_01"
else:
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
except Exception as ex:
print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.")
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
class OMNYFastEpicsShutter(Device):
"""
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
the shutter control methods (fshopen, fshclose, fshstatus, fshinfo) from the
cSAXSFastEpicsShutter device. The device is identified by the 'fsh' name in the device manager.
If the 'fsh' device is not found in the device manager, this device will create a dummy shutter
and log warnings when shutter methods are called, but will not raise exceptions.
Fast EPICS shutter with automatic PV selection based on host subnet.
"""
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help", "fshstatus_readback"]
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
shutter = Cpt(Signal, name="shutter")
# PV is detected dynamically at import time
shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True)
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.shutter.subscribe(self._emit_value)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self.wait_for_connection()
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
# -----------------------------------------------------
# User-facing shutter control functions
# -----------------------------------------------------
# pylint: disable=invalid-name
def _check_if_cSAXS_shutter_exists_in_config(self) -> bool:
"""
Check on the device manager if the shutter device exists.
Returns:
bool: True if the 'fsh' device exists in the device manager, False otherwise
"""
if self.device_manager.devices.get("fsh", None) is None:
logger.warning(f"Fast shutter device not found for {self.name}.")
return False
return True
def fshopen(self):
"""Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
try:
self.shutter.put(1, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
def fshclose(self):
"""Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)
try:
self.shutter.put(0, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
def fshstatus(self):
"""Return the fast shutter status (0=closed, 1=open)."""
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshstatus()
else:
try:
return self.shutter.get()
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
def fshinfo(self):
"""Print information about which EPICS PV channel is being used."""
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshinfo()
else:
print("Using dummy fast shutter device. No EPICS channel is connected.")
pvname = self.shutter.pvname
print(f"Fast shutter connected to EPICS channel: {pvname}")
return pvname
def help(self):
"""Display available user methods."""
print("Available methods:")
for method in self.USER_ACCESS:
print(f" - {method}")
def fshstatus_readback(self):
"""Return the fast shutter status (0=closed, 1=open) from the readback signal."""
if self._check_if_cSAXS_shutter_exists_in_config():
return self.device_manager.devices["fsh"].fshstatus_readback()
else:
self.shutter.get()

View File

@@ -2,7 +2,7 @@ import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt
from ophyd import Device, Component as Cpt, Kind, Signal
from ophyd_devices import PreviewSignal
import traceback
@@ -13,6 +13,13 @@ logger = bec_logger.logger
class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs)
@@ -21,20 +28,54 @@ class WebcamViewer(Device):
self._update_thread = None
self._buffer = b""
self._shutdown_event = threading.Event()
self._live_mode_lock = threading.RLock()
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
if self._connection is not None:
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if self._update_thread is not None and self._update_thread.is_alive():
return
self._shutdown_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
return
if self._update_thread is None:
return
self._shutdown_event.set()
if self._connection is not None:
try:
self._connection.close()
except Exception: # pylint: disable=broad-except
pass
self._connection = None
self._update_thread.join(timeout=2)
if self._update_thread.is_alive():
logger.warning("Webcam live mode thread did not stop within timeout.")
return
self._update_thread = None
self._buffer = b""
self._shutdown_event.clear()
def _update_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
self._connection = requests.get(self.url, stream=True)
self._connection = requests.get(self.url, stream=True, timeout=5)
for chunk in self._connection.iter_content(chunk_size=1024):
if self._shutdown_event.is_set():
break
self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end
@@ -50,16 +91,3 @@ class WebcamViewer(Device):
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -1,74 +1,46 @@
from ophyd import Component as Cpt
import numpy as np
from ophyd import Component as Cpt, Signal, EpicsSignal
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayAlignGUI(Device):
class OMNYXRayEpicsGUI(Device):
save_frame = Cpt(
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
)
update_frame_acqdone = Cpt(
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
update_frame_acqdone = Cpt(Signal, value=0)
update_frame_acq = Cpt(Signal, value=0)
enable_mv_x = Cpt(Signal, value=0)
enable_mv_y = Cpt(Signal, value=0)
send_message = Cpt(Signal, value=0)
sample_name = Cpt(Signal, value=0)
angle = Cpt(Signal, value=0)
pixel_size = Cpt(Signal, value=0)
submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
)
step = Cpt(
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
)
xval_x_dynamic = {
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)
)
step = Cpt(Signal, value=0)
recbg = Cpt(Signal, value=0)
mvx = Cpt(Signal, value=0)
mvy = Cpt(Signal, value=0)
fit_array = Cpt(Signal, value=np.zeros((3, 10)))
fit_params_x = Cpt(Signal, value=np.zeros((2, 3)))
fit_params_y = Cpt(Signal, value=np.zeros((2, 3)))
# Generate width_y_0 to width_y_10
for i in range(11):
locals()[f'width_y_{i}'] = Cpt(Signal, value=0)
# Generate width_x_0 to width_x_10
for i in range(11):
locals()[f'width_x_{i}'] = Cpt(Signal, value=0)
# Generate xval_x_0 to xval_x_10
for i in range(11):
locals()[f'xval_x_{i}'] = Cpt(Signal, value=0)
# Generate yval_y_0 to yval_y_10
for i in range(11):
locals()[f'yval_y_{i}'] = Cpt(Signal, value=0)
# Generate stage_pos_x_1 to stage_pos_x_5
for i in range(1, 6):
locals()[f'stage_pos_x_{i}'] = Cpt(Signal, value=0)

View File

@@ -27,7 +27,6 @@ from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import SyncFlyScanBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE
logger = bec_logger.logger
@@ -74,13 +73,14 @@ class FlomniFermatScan(SyncFlyScanBase):
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)
"""
super().__init__(parameter=parameter, exp_time=exp_time, **kwargs)
super().__init__(parameter=parameter, **kwargs)
self.show_live_table = False
self.axis = []
self.fovx = fovx
self.fovy = fovy
self.cenx = cenx
self.ceny = ceny
self.exp_time = exp_time
self.step = step
self.zshift = zshift
self.angle = angle
@@ -151,9 +151,6 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1])
def _prepare_setup_part2(self):
# Prepare DDG1 to use
yield from self.stubs.send_rpc_and_wait("ddg1", "set_trigger", TRIGGERSOURCE.EXT_RISING_EDGE.value)
if self.flomni_rotation_status:
self.flomni_rotation_status.wait()
@@ -310,10 +307,6 @@ class FlomniFermatScan(SyncFlyScanBase):
logger.warning("No positions found to return to start")
def cleanup(self):
yield from self.stubs.send_rpc_and_wait("ddg1", "set_trigger", TRIGGERSOURCE.SINGLE_SHOT.value)
yield from super().cleanup()
def run(self):
self.initialize()
yield from self.read_scan_motors()

View File

@@ -25,8 +25,6 @@ dependencies = [
"bec_widgets",
"zmq",
"opencv-python",
"dectris-compression", # for JFJ preview stream decompression
"cbor2",
]
[project.optional-dependencies]

View File

@@ -1,14 +0,0 @@
"""
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
hanging tests when a readback is attempted on a non-connected socket.
"""
# conftest.py
import pytest
from ophyd_devices.utils.socket import SocketSignal
@pytest.fixture(autouse=True)
def patch_socket_timeout(monkeypatch):
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)

View File

@@ -282,11 +282,10 @@ def test_ddg1_stage(mock_ddg1: DDG1):
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
mock_ddg1.fast_shutter_control._read_pv.mock_data = 0 # Simulate shutter control
mock_ddg1.stage()
shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger
shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3
assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled
assert np.isclose(mock_ddg1.burst_delay.get(), 0)
@@ -303,25 +302,6 @@ def test_ddg1_stage(mock_ddg1: DDG1):
assert np.isclose(mock_ddg1.ef.width.get(), 1e-6)
assert mock_ddg1.staged == ophyd.Staged.yes
mock_ddg1.unstage()
# Test if shutter is kept open..
mock_ddg1.fast_shutter_control._read_pv.mock_data = 1 # Simulate shutter control is kept open
# Test method
mock_ddg1.keep_shutter_open_during_scan(True)
shutter_width = mock_ddg1._shutter_to_open_delay + exp_time * frames_per_trigger
assert np.isclose(
shutter_width, exp_time * frames_per_trigger
) # Shutter to open delay is not added as shutter is kept open
# Simulate fly scan, so no extra trigger for MCS card.
mock_ddg1.scan_info.msg.scan_type = "fly"
mock_ddg1.stage()
# Shutter channel cd
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
assert np.isclose(mock_ddg1.cd.width.get(), shutter_width)
# MCS channel ef or gate
assert np.isclose(mock_ddg1.ef.delay.get(), 0)
assert np.isclose(mock_ddg1.ef.width.get(), 0) # No triggering of MCS due to shutter fly scan
def test_ddg1_on_trigger(mock_ddg1: DDG1):
@@ -351,28 +331,9 @@ def test_ddg1_on_trigger(mock_ddg1: DDG1):
#################################
with mock.patch.object(ddg, "_prepare_mcs_on_trigger") as mock_prepare_mcs:
mock_prepare_mcs.return_value = ophyd.StatusBase(done=True, success=True)
# MCS card is present and enabled, should call prepare_mcs_on_trigger
# and the status should resolve once acuiring goes from 1 to 0.
status = ddg.trigger()
assert status.done is False
mcs = ddg.device_manager.devices.get("mcs", None)
assert mcs is not None
mcs.acquiring._read_pv.mock_data = 1 # Simulate acquiring started
assert status.done is False
mcs.acquiring._read_pv.mock_data = 0 # Simulate acquiring stopped
status.wait(timeout=1) # Wait for the status to be done
assert status.done is True
assert status.success is True
mock_prepare_mcs.assert_called_once()
# Now we disable the mcs card, and trigger again. This should not call prepare_mcs_on_trigger
# and should fallback to polling the DDG for END_OF_BURST status bit.
# Disable mcs card
mcs.enabled = False
status = ddg.trigger()
# Check that the poll thread run event is set
# Careful in debugger, there is a timeout based on the exp_time + 5s default
assert ddg._poll_thread_run_event.is_set()
assert not ddg._poll_thread_poll_loop_done.is_set()

View File

@@ -5,7 +5,6 @@ from time import time
from typing import TYPE_CHECKING, Generator
from unittest import mock
import numpy as np
import pytest
from bec_lib.messages import FileMessage, ScanStatusMessage
from jfjoch_client.models.broker_status import BrokerStatus
@@ -79,7 +78,7 @@ def detector_list(request) -> Generator[DetectorList, None, None]:
),
DetectorListElement(
id=2,
description="EIGER 9M",
description="EIGER 8.5M (tmp)",
serial_number="123456",
base_ipv4_addr="192.168.0.1",
udp_interface_count=1,
@@ -104,11 +103,7 @@ def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
name = "eiger_1_5m"
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
dev.scan_info.msg = mock_scan_info
try:
yield dev
finally:
if dev._destroyed is False:
dev.destroy()
yield dev
@pytest.fixture(scope="function")
@@ -118,19 +113,7 @@ def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
name = "eiger_9m"
dev = Eiger9M(name=name)
dev.scan_info.msg = mock_scan_info
try:
yield dev
finally:
if dev._destroyed is False:
dev.destroy()
def test_eiger_wait_for_connection(eiger_1_5m, eiger_9m):
"""Test the wait_for_connection metho is calling status_get on the JFJ API client."""
for eiger in (eiger_1_5m, eiger_9m):
with mock.patch.object(eiger.jfj_client.api, "status_get") as mock_status_get:
eiger.wait_for_connection(timeout=1)
mock_status_get.assert_called_once_with(_request_timeout=1)
yield dev
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
@@ -158,7 +141,7 @@ def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
@@ -196,7 +179,7 @@ def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
@@ -233,39 +216,11 @@ def test_eiger_on_stop(eiger_1_5m):
stop_event.wait(timeout=5) # Thread should be killed from task_handler
def test_eiger_on_destroy(eiger_1_5m):
"""Test the on_destroy logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
start_event = threading.Event()
stop_event = threading.Event()
def tmp_task():
start_event.set()
try:
while True:
time.sleep(0.1)
finally:
stop_event.set()
eiger.task_handler.submit_task(tmp_task)
start_event.wait(timeout=5)
with (
mock.patch.object(eiger.jfj_preview_client, "stop") as mock_jfj_preview_client_stop,
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
):
eiger.on_destroy()
mock_jfj_preview_client_stop.assert_called_once()
mock_jfj_client_stop.assert_called_once()
stop_event.wait(timeout=5)
@pytest.mark.timeout(25)
@pytest.mark.parametrize("raise_timeout", [True, False])
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
eiger._wait_for_on_complete = 1 # reduce wait time for testing
callback_completed_event = threading.Event()
@@ -275,7 +230,7 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout):
unblock_wait_for_idle = threading.Event()
def mock_wait_for_idle(timeout: float, raise_on_timeout: bool) -> bool:
def mock_wait_for_idle(timeout: int, request_timeout: float):
if unblock_wait_for_idle.wait(timeout):
if raise_timeout:
return False
@@ -283,18 +238,11 @@ def test_eiger_on_complete(eiger_1_5m, raise_timeout):
return False
with (
mock.patch.object(
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state="Idle")
),
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
mock.patch.object(
eiger.jfj_client.api,
"statistics_data_collection_get",
return_value=MeasurementStatistics(
run_number=1,
images_collected=eiger.scan_info.msg.num_points
* eiger.scan_info.msg.scan_parameters["frames_per_trigger"],
),
return_value=MeasurementStatistics(run_number=1),
),
):
status = eiger.complete()
@@ -336,7 +284,7 @@ def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
def test_eiger_on_stage(eiger_1_5m):
def test_eiger_on_sage(eiger_1_5m):
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
scan_msg = eiger.scan_info.msg
@@ -368,35 +316,3 @@ def test_eiger_on_stage(eiger_1_5m):
)
assert mock_start.call_args == mock.call(settings=data_settings)
assert eiger.staged is Staged.yes
def test_eiger_set_det_distance_test_beam_center(eiger_1_5m):
"""Test the set_detector_distance and set_beam_center methods. Equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
old_distance = eiger.detector_distance
new_distance = old_distance + 100
old_beam_center = eiger.beam_center
new_beam_center = (old_beam_center[0] + 20, old_beam_center[1] + 50)
eiger.set_detector_distance(new_distance)
assert eiger.detector_distance == new_distance
eiger.set_beam_center(x=new_beam_center[0], y=new_beam_center[1])
assert eiger.beam_center == new_beam_center
with pytest.raises(ValueError):
eiger.set_beam_center(x=-10, y=100) # Cannot set negative beam center
with pytest.raises(ValueError):
eiger.detector_distance = -50 # Cannot set negative detector distance
def test_eiger_preview_callback(eiger_1_5m):
"""Preview callback test for the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
# NOTE: I don't find models for the CBOR messages used by JFJ, currently using a dummay dict.
# Please adjust once the proper model is found.
for msg_type in ["start", "end", "image", "calibration", "metadata"]:
msg = {"type": msg_type, "data": {"default": np.array([[1, 2], [3, 4]])}}
with mock.patch.object(eiger.preview_image, "put") as mock_preview_put:
eiger._preview_callback(msg)
if msg_type == "image":
mock_preview_put.assert_called_once_with(msg["data"]["default"])
else:
mock_preview_put.assert_not_called()

View File

@@ -1,37 +0,0 @@
"""Module to test epics devices."""
import pytest
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.fast_shutter import cSAXSFastEpicsShutter
@pytest.fixture
def fast_shutter_device():
"""Fixture to create a patched cSAXSFastEpicsShutter device for testing."""
with patched_device(cSAXSFastEpicsShutter, name="fsh", prefix="X12SA-ES1-TTL:") as device:
yield device
def test_fast_shutter_methods(fast_shutter_device):
"""Test the user-facing methods of the cSAXSFastEpicsShutter device."""
assert fast_shutter_device.name == "fsh", "Device name should be 'fsh'"
assert fast_shutter_device.prefix == "X12SA-ES1-TTL:", "Device prefix is 'X12SA-ES1-TTL:'"
# Test fshopen and fshclose
fast_shutter_device.fshopen()
assert fast_shutter_device.shutter.get() == 1, "Shutter should be open (1) after fshopen()"
assert fast_shutter_device.fshstatus() == 1, "fshstatus should return 1 when shutter is open"
fast_shutter_device.fshclose()
assert fast_shutter_device.shutter.get() == 0, "Shutter should be closed (0) after fshclose()"
assert fast_shutter_device.fshstatus() == 0, "fshstatus should return 0 when shutter is closed"
# shutter_readback is connected to separate PV.
fast_shutter_device.shutter_readback._read_pv.mock_data = 1 # Simulate readback showing open
assert (
fast_shutter_device.fshstatus_readback() == 1
), "fshstatus_readback should return 1 when shutter readback shows open"
fast_shutter_device.shutter_readback._read_pv.mock_data = 0 # Simulate readback showing closed
assert (
fast_shutter_device.fshstatus_readback() == 0
), "fshstatus_readback should return 0 when shutter readback shows closed"

View File

@@ -1,99 +0,0 @@
# pylint: skip-file
from __future__ import annotations
import pytest
from ophyd_devices.tests.utils import patched_device
from csaxs_bec.devices.epics.eps import EPS
ALL_PVS = [
# ALARMS
"X12SA-EPS-PLC:AlarmCnt_EPS",
"ARS00-MIS-PLC-01:AlarmCnt_Frontends",
# FRONTEND VALVES
"X12SA-FE-VVPG-0000:PLC_OPEN",
"X12SA-FE-VVPG-1010:PLC_OPEN",
"X12SA-FE-VVFV-2010:PLC_OPEN",
"X12SA-FE-VVPG-2010:PLC_OPEN",
# Optics VALVES
"X12SA-OP-VVPG-1010:PLC_OPEN",
"X12SA-OP-VVPG-2010:PLC_OPEN",
"X12SA-OP-VVPG-3010:PLC_OPEN",
"X12SA-OP-VVPG-3020:PLC_OPEN",
"X12SA-OP-VVPG-4010:PLC_OPEN",
"X12SA-OP-VVPG-5010:PLC_OPEN",
"X12SA-OP-VVPG-6010:PLC_OPEN",
"X12SA-OP-VVPG-7010:PLC_OPEN",
# Endstation VALVES
"X12SA-ES-VVPG-1010:PLC_OPEN",
# Frontend SHUTTERS
"X12SA-FE-PSH1-EMLS-0010:OPEN",
"X12SA-FE-STO1-EMLS-0010:OPEN",
# Optics SHUTTERS
"X12SA-OP-PSH1-EMLS-7010:OPEN",
# DMM Monochromator
"X12SA-OP-DMM-ETTC-3010:TEMP",
"X12SA-OP-DMM-ETTC-3020:TEMP",
"X12SA-OP-DMM-ETTC-3030:TEMP",
"X12SA-OP-DMM-ETTC-3040:TEMP",
"X12SA-OP-DMM-EMLS-3010:THRU",
"X12SA-OP-DMM-EMLS-3020:IN",
"X12SA-OP-DMM-EMLS-3030:THRU",
"X12SA-OP-DMM-EMLS-3040:IN",
"X12SA-OP-DMM-EMSW-3050:SWITCH",
"X12SA-OP-DMM-EMSW-3060:SWITCH",
"X12SA-OP-DMM-EMSW-3070:SWITCH",
"X12SA-OP-DMM1:ENERGY-GET",
"X12SA-OP-DMM1:POSITION",
"X12SA-OP-DMM1:STRIPE",
# CCM Monochromator
"X12SA-OP-CCM-ETTC-4010:TEMP",
"X12SA-OP-CCM-ETTC-4020:TEMP",
"X12SA-OP-CCM-EMSW-4010:SWITCH",
"X12SA-OP-CCM-EMSW-4020:SWITCH",
"X12SA-OP-CCM-EMSW-4030:SWITCH",
"X12SA-OP-CCM1:ENERGY-GET",
"X12SA-OP-CCM1:POSITION",
# Water Cooling
"X12SA-OP-SL1-EFSW-2010:FLOW",
"X12SA-OP-SL2-EFSW-2010:FLOW",
"X12SA-OP-EB1-EFSW-5010:FLOW",
"X12SA-OP-EB1-EFSW-5020:FLOW",
"X12SA-OP-SL3-EFSW-5010:FLOW",
"X12SA-OP-KB-EFSW-6010:FLOW",
"X12SA-OP-PSH1-EFSW-7010:FLOW",
"X12SA-ES-EB2-EFSW-1010:FLOW",
"X12SA-OP-CS-ECVW-0010:PLC_OPEN",
"X12SA-OP-CS-ECVW-0020:PLC_OPEN",
# Request PVs
"X12SA-EPS-PLC:ACKERR-REQUEST",
"X12SA-OP-CS-ECVW:PLC_REQUEST",
]
@pytest.fixture
def eps():
dev_name = "EPS"
with patched_device(EPS, name=dev_name) as eps:
yield eps
def test_eps_has_signals(eps):
"""Test that all expected PVs are present in the eps device."""
found_pvs = [walk.item._read_pv.pvname for walk in eps.walk_signals()]
assert set(found_pvs) == set(
ALL_PVS
), f"Expected PVs {ALL_PVS} but found {set(ALL_PVS) - set(found_pvs)}"
# pylint: disable=line-too-long
expected_show_all_output = "\x1b[1mX12SA EPS status\x1b[0m\n\n\x1b[1mEPS Alarms\x1b[0m\n - X12SA EPS Alarm count 0\n - FrontEnd MIS Alarm count 0\n\n\x1b[1mValves Frontend\x1b[0m\n - FE-VVPG-0000 \x1b[91mCLOSED\x1b[0m\n - FE-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n - FE-VVFV-2010 \x1b[91mCLOSED\x1b[0m\n - FE-VVPG-2010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mValves Optics Hutch\x1b[0m\n - OP-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-2010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-3010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-3020 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-4010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-5010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-6010 \x1b[91mCLOSED\x1b[0m\n - OP-VVPG-7010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mValves ES Hutch\x1b[0m\n - ES-VVPG-1010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mShutters Frontend\x1b[0m\n - FE-PSH1-EMLS-0010 \x1b[91mCLOSED\x1b[0m\n - FE-STO1-EMLS-0010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mShutters Endstation\x1b[0m\n - OP-PSH1-EMLS-7010 \x1b[91mCLOSED\x1b[0m\n\n\x1b[1mDMM Monochromator\x1b[0m\n - DMM Temp Surface 1 0.0\n - DMM Temp Surface 2 0.0\n - DMM Temp Shield 1 (disaster) 0.0\n - DMM Temp Shield 2 (disaster) 0.0\n - DMM Translation ThruPos \x1b[91mINACTIVE\x1b[0m\n - DMM Translation InPos \x1b[91mINACTIVE\x1b[0m\n - DMM Bragg ThruPos \x1b[91mINACTIVE\x1b[0m\n - DMM Bragg InPos \x1b[91mINACTIVE\x1b[0m\n - DMM Heater Fault XTAL 1 \x1b[92mOK\x1b[0m\n - DMM Heater Fault XTAL 2 \x1b[92mOK\x1b[0m\n - DMM Heater Fault Support 1 \x1b[92mOK\x1b[0m\n - DMM Energy 0.0000\n - DMM Position out of beam\n - DMM Stripe Stripe 1 W/B4C\n\n\x1b[1mCCM Monochromator\x1b[0m\n - CCM Temp Crystal 0.0\n - CCM Temp Shield (disaster) 0.0\n - CCM Heater Fault 1 \x1b[92mOK\x1b[0m\n - CCM Heater Fault 2 \x1b[92mOK\x1b[0m\n - CCM Heater Fault 3 \x1b[92mOK\x1b[0m\n - CCM Energy 0.0000\n - CCM Position out of beam\n\n\x1b[1mCooling Water\x1b[0m\n - OP-SL1-EFSW-2010 \x1b[91mFAIL\x1b[0m\n - OP-SL2-EFSW-2010 \x1b[91mFAIL\x1b[0m\n - OP-EB1-EFSW-5010 \x1b[91mFAIL\x1b[0m\n - OP-EB1-EFSW-5020 \x1b[91mFAIL\x1b[0m\n - OP-SL3-EFSW-5010 \x1b[91mFAIL\x1b[0m\n - OP-KB-EFSW-6010 \x1b[91mFAIL\x1b[0m\n - OP-PSH1-EFSW-7010 \x1b[91mFAIL\x1b[0m\n - ES-EB2-EFSW-1010 \x1b[91mFAIL\x1b[0m\n - OP-CS-ECVW-0010 \x1b[91mCLOSED\x1b[0m\n - OP-CS-ECVW-0020 \x1b[91mCLOSED\x1b[0m\n\n\x1b[96mHint:\x1b[0m Both water cooling valves are CLOSED.\nYou can open them using: \x1b[1mdev.x12saEPS.water_cooling_op()\x1b[0m\n"
def test_eps_show_all(eps, capsys):
"""Test that the show_all method outputs the expected status."""
eps.show_all()
output = capsys.readouterr().out
assert (
output == expected_show_all_output
), f"Expected output does not match actual output.\nExpected:\n{expected_show_all_output}\nActual:\n{output}"