Compare commits

..

13 Commits

Author SHA1 Message Date
cffc979858 wip client regenerated
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m50s
CI for csaxs_bec / test (pull_request) Failing after 1m52s
2026-03-02 21:09:18 +01:00
55fa9d073a wip flomni gui tools adopted to V3 2026-03-02 21:09:18 +01:00
dd54a22c00 wip x_ray_eye adopted to V3 2026-03-02 21:09:15 +01:00
58868eba26 wip removed old imports 2026-03-02 21:07:43 +01:00
d198f88387 WIP ptycho_floimni_sim simulation for local development 2026-03-02 21:07:43 +01:00
x12sa
116ef7d360 wip adjusting to V3 2026-03-02 21:07:43 +01:00
x01dc
625651247c set fixed on the script 2026-03-02 21:07:43 +01:00
x01dc
055542c216 colorbar changed to greys 2026-03-02 21:07:41 +01:00
24ddb30cad fix(camera): unify the live mode on cameras 2026-03-02 21:06:25 +01:00
x01dc
bbb7dfe48c feat(xray_eye): alignment gui and script adapted to not use epics gui device 2026-03-02 21:06:03 +01:00
6d1cd1f488 fix(ids_camera): live mode signal 2026-03-02 21:03:59 +01:00
1256b603de feat(allied-vision-camera): Add allied vision camera integration 2026-03-02 21:03:59 +01:00
4d69f8f90f fix(bec_widgets): removed omny alignment old gui
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 21:00:14 +01:00
21 changed files with 1683 additions and 771 deletions

View File

@@ -1006,6 +1006,20 @@ class FlomniAlignmentMixin:
with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file:
tomo_alignment_fit[1][4] = file.readline()
tomo_alignment_fit[0][0] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['amplitude']
tomo_alignment_fit[0][1] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['frequency']
tomo_alignment_fit[0][2] = gui.flomni.xeyegui.XRayEye.Waveform.fit_x_SineModel.dap_params['shift']
tomo_alignment_fit[1][0] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['amplitude']
tomo_alignment_fit[1][1] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['frequency']
tomo_alignment_fit[1][2] = gui.flomni.xeyegui.XRayEye.Waveform_0.fit_y_SineModel.dap_params['shift']
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 0
tomo_alignment_fit[1][4] = 0
print("New alignment parameters loaded:")
print(
f"X Amplitude {tomo_alignment_fit[0][0]}, "

View File

@@ -1,7 +1,5 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
@@ -9,6 +7,7 @@ if builtins.__dict__.get("bec") is not None:
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
@@ -28,10 +27,11 @@ class flomniGuiTools:
self.gui = self.client.gui
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.gui.flomni.show()
else:
self.gui.new("flomni")
self.gui.new("flomni")
# if "flomni" in self.gui.windows:
# self.gui.flomni.show()
# else:
# self.gui.new("flomni")
def flomnigui_stop_gui(self):
self.gui.flomni.hide()
@@ -39,37 +39,37 @@ class flomniGuiTools:
def flomnigui_raise(self):
self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
self.xeyegui = self.gui.flomni.new("XRayEye", object_name="xrayeye")
# start live
if not dev.cam_xeye.live_mode_enabled.get():
# dev.cam_xeye.live_mode = True
dev.cam_xeye.live_mode_enabled.put(True)
self.xeyegui.switch_tab("alignment")
def flomnigui_show_xeyealign_fittab(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("XRayEye")
self.xeyegui.switch_tab("fit")
def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"flomni"):
if hasattr(self.gui.flomni,attribute_name):
if hasattr(self.gui, "flomni"):
if hasattr(self.gui.flomni, attribute_name):
return False
return True
def flomnigui_show_cameras(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
if self._flomnigui_check_attribute_not_exists(
"camera_gripper"
) or self._flomnigui_check_attribute_not_exists("camera_overview"):
self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
camera_gripper_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True
@@ -80,7 +80,7 @@ class flomniGuiTools:
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
camera_overview_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True
@@ -93,10 +93,10 @@ class flomniGuiTools:
print("Cannot open camera_overview. Device does not exist.")
def flomnigui_remove_all_docks(self):
#dev.cam_flomni_overview.stop_live_mode()
#dev.cam_flomni_gripper.stop_live_mode()
#dev.cam_xeye.live_mode = False
self.gui.flomni.delete_all()
# dev.cam_flomni_overview.stop_live_mode()
# dev.cam_flomni_gripper.stop_live_mode()
# dev.cam_xeye.live_mode = False
self.gui.flomni.delete_all()
self.progressbar = None
self.text_box = None
@@ -104,7 +104,7 @@ class flomniGuiTools:
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
idle_text_box = self.gui.flomni.new("TextBox")
text = (
"<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
@@ -120,13 +120,12 @@ class flomniGuiTools:
import csaxs_bec
from pathlib import Path
print("The general flOMNI documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/flomni.html#user-ptychography-flomni")
print(
"The general flOMNI documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/flomni.html#user-ptychography-flomni"
)
csaxs_bec_basepath = Path(csaxs_bec.__file__).parent
docs_folder = (
csaxs_bec_basepath /
"bec_ipython_client" / "plugins" / "flomni" / "docs"
)
docs_folder = csaxs_bec_basepath / "bec_ipython_client" / "plugins" / "flomni" / "docs"
if not docs_folder.is_dir():
raise NotADirectoryError(f"Docs folder not found: {docs_folder}")
@@ -164,10 +163,9 @@ class flomniGuiTools:
self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget")
# --- Load PDF ---------------------------------------------------------
self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
self.pdf_viewer.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n")
def _flomnicam_check_device_exists(self, device):
try:
device
@@ -181,7 +179,7 @@ class flomniGuiTools:
if self._flomnigui_check_attribute_not_exists("progressbar"):
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
self.progressbar = self.gui.flomni.new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
@@ -196,7 +194,7 @@ class flomniGuiTools:
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self.text_box = self.gui.flomni.new("TextBox")
self._flomnigui_update_progress()
@@ -222,6 +220,7 @@ if __name__ == "__main__":
client.start()
client.gui = BECGuiClient()
flomni_gui = flomniGuiTools(client)
flomni_gui = flomniGuiTools()
flomni_gui.set_client(client)
flomni_gui.flomnigui_show_gui()
flomni_gui.flomnigui_show_progress()

View File

@@ -5,9 +5,11 @@ import os
import time
from typing import TYPE_CHECKING
import numpy as np
from bec_lib import bec_logger
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger
# import builtins to avoid linter errors
@@ -24,7 +26,7 @@ if TYPE_CHECKING:
class XrayEyeAlign:
# pixel calibration, multiply to get mm
labview=False
test_wo_movements = True
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None:
@@ -36,209 +38,194 @@ class XrayEyeAlign:
self.flomni.reset_correction()
self.flomni.reset_tomo_alignment_fit()
@property
def gui(self):
return self.flomni.xeyegui
def _reset_init_values(self):
self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0]
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self, keep_shutter_open=False):
def update_frame(self,keep_shutter_open=False):
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
if not self.labview:
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
# self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode_enabled.get():
dev.cam_xeye.live_mode_enabled.put(True)
epics_put("XOMNYI-XEYE-ACQ:0", 1)
if self.labview:
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
self.gui.on_live_view_enabled(True)
fshopen()
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
dev.omnyfsh.fshopen()
time.sleep(0.5)
# stop live view
if not keep_shutter_open:
epics_put("XOMNYI-XEYE-ACQ:0", 0)
self.gui.on_live_view_enabled(False)
time.sleep(0.1)
fshclose()
print("got new frame")
dev.omnyfsh.fshclose()
print("Received new frame.")
else:
print("Staying in live view, shutter is and remains open!")
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
umv(self.device_manager.devices.fsamroy, val)
if not self.test_wo_movements:
umv(self.device_manager.devices.fsamroy, val)
def get_tomo_angle(self):
return self.device_manager.devices.fsamroy.readback.get()
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
self._xray_fov_xy[0] = max(
getattr(dev.omny_xray_gui, f"width_x_{k}").get(), self._xray_fov_xy[0]
)
self._xray_fov_xy[1] = max(
getattr(dev.omny_xray_gui, f"width_y_{k}").get(), self._xray_fov_xy[1]
)
@property
def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def movement_buttons_enabled(self, enablex: bool, enabley: bool):
self.gui.on_motors_enable(enablex, enabley)
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
print(f"In alginment GUI: {msg}")
self.gui.user_message = msg
def align(self,keep_shutter_open=False):
def align(self, keep_shutter_open=False):
self.flomni.flomnigui_show_xeyealign()
if not keep_shutter_open:
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
print(
"This routine can be called with paramter keep_shutter_open=True to keep the shutter always open"
)
self.send_message("Getting things ready. Please wait...")
#potential unresolved movement requests to zero
epics_put("XOMNYI-XEYE-MVX:0", 0)
epics_put("XOMNYI-XEYE-MVY:0", 0)
self.gui.enable_submit_button(False)
# Initialize xray align device
# clear potential pending movement requests
dev.omny_xray_gui.mvx.set(0)
dev.omny_xray_gui.mvy.set(0)
# reset submit channel
dev.omny_xray_gui.submit.set(0)
self.movement_buttons_enabled(False, False)
# reset shift xy and fov params
self._reset_init_values()
self.flomni.lights_off()
self.flomni.flomnigui_show_xeyealign()
self.flomni.flomnigui_raise()
# self.flomni.flomnigui_show_xeyealign()
# self.flomni.flomnigui_raise()
self.tomo_rotate(0)
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
if not self.test_wo_movements:
self.tomo_rotate(0)
self.flomni.feye_in()
self.flomni.feye_in()
self.flomni.laser_tracker_on()
self.flomni.feedback_enable_with_reset()
# disable movement buttons
self.movement_buttons_enabled = False
self.movement_buttons_enabled(False, False)
sample_name = self.flomni.sample_get_name(0)
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", sample_name)
self.gui.sample_name = sample_name
# this makes sure we are in a defined state
self.flomni.feedback_disable()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
if not self.test_wo_movements:
self.flomni.fosa_out()
self.flomni.fosa_out()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in()
self.flomni.ffzp_in()
self.update_frame(keep_shutter_open)
# enable submit buttons
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.gui.enable_submit_button(True)
dev.omny_xray_gui.step.set(0).wait()
self.send_message("Submit center value of FZP.")
k = 0
while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") / 2 * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = val_x
if dev.omny_xray_gui.submit.get() == 1:
self.alignment_values[k] = (
getattr(dev.omny_xray_gui, f"xval_x_{k}").get() / 2 * self.PIXEL_CALIBRATION
) # in mm
print(f"Clicked position {k}: x {self.alignment_values[k]}")
rtx_position = dev.rtx.readback.get() / 1000
print(f"Current rtx position {rtx_position}")
self.alignment_values[k] -= rtx_position
print(f"Corrected position {k}: x {self.alignment_values[k]}")
# reset submit channel
dev.omny_xray_gui.submit.set(0)
if k == 0: # received center value of FZP
self.send_message("please wait ...")
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled(False, False)
self.gui.enable_submit_button(False)
self.flomni.feedback_disable()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in)
if not self.test_wo_movements:
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in)
self.flomni.foptics_out()
self.flomni.foptics_out()
self.flomni.feedback_disable()
umv(dev.fsamx, fsamx_in - 0.25)
time.sleep(0.5)
if self.labview:
self.update_frame(keep_shutter_open)
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in)
time.sleep(0.5)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
self.send_message("Adjust sample height and submit center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
self.send_message("Step 1/5: Adjust sample height and submit center")
self.gui.enable_submit_button(True)
self.movement_buttons_enabled(True, True)
elif 1 <= k < 5: # received sample center value at samroy 0 ... 315
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
self.gui.enable_submit_button(False)
self.movement_buttons_enabled(False, False)
umv(dev.rtx, 0)
self.tomo_rotate(k * 45)
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
dev.omny_xray_gui.angle.set(self.get_tomo_angle())
self.update_frame(keep_shutter_open)
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.send_message(f"Step {k+1}/5: Submit sample center")
self.gui.enable_submit_button(True)
self.movement_buttons_enabled(True, False)
self.update_fov(k)
elif k == 5: # received sample center value at samroy 270 and done
self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled = False
self.gui.enable_submit_button(False)
self.movement_buttons_enabled(False, False)
self.update_fov(k)
break
k += 1
epics_put("XOMNYI-XEYE-STEP:0", k)
dev.omny_xray_gui.step.set(k)
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
_xrayeyalignmvx = dev.omny_xray_gui.mvx.get()
if _xrayeyalignmvx != 0:
umvr(dev.rtx, _xrayeyalignmvx)
print(f"Current rtx position {dev.rtx.readback.get() / 1000}")
epics_put("XOMNYI-XEYE-MVX:0", 0)
if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3)
dev.omny_xray_gui.mvx.set(0)
self.update_frame(keep_shutter_open)
if k < 2:
# allow movements, store movements to calculate center
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
_xrayeyalignmvy = dev.omny_xray_gui.mvy.get()
if _xrayeyalignmvy != 0:
self.flomni.feedback_disable()
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
if not self.test_wo_movements:
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
time.sleep(2)
epics_put("XOMNYI-XEYE-MVY:0", 0)
dev.omny_xray_gui.mvy.set(0)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
time.sleep(0.2)
time.sleep(0.1)
self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
@@ -248,22 +235,17 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
# free camera
if self.labview:
epics_put("XOMNYI-XEYE-ACQ:0", 2)
if keep_shutter_open and not self.labview:
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
if keep_shutter_open:
if self.flomni.OMNYTools.yesno("Close the shutter now?", "y"):
dev.omnyfsh.fshclose()
self.gui.on_live_view_enabled(False)
print("setting 'XOMNYI-XEYE-ACQ:0'")
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"
f" = {fovy:.0f} microns"
)
print("Use the matlab routine to FIT the current alignment...")
print("Check the fit in the GUI...")
print("Then LOAD ALIGNMENT PARAMETERS by running flomni.read_alignment_offset()\n")
@@ -271,9 +253,33 @@ class XrayEyeAlign:
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")
if not os.path.exists(file):
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, "w") as alignment_values_file:
alignment_values_file.write("angle\thorizontal\n")
# Initialize an empty list to store fovx values
fovx_list = []
fovx_offsets = np.zeros(5) # holds offsets for k = 1..5
for k in range(1, 6):
fovx_offset = self.alignment_values[0] - self.alignment_values[k]
fovx_offsets[k - 1] = fovx_offset # store in array
fovx_x = (k - 1) * 45
fovx_list.append([fovx_x, fovx_offset * 1000]) # Append the data to the list
print(f"Writing to file new alignment: number {k}, value x {fovx_offset}")
alignment_values_file.write(f"{(k-1)*45}\t{fovx_offset*1000}\n")
alignment_values_file.write(f"{fovx_x}\t{fovx_offset * 1000}\n")
# Now build final numpy array:
data = np.array(
[
[0, 45, 90, 135, 180], # angles
fovx_offsets * 1000, # fovx_offset values
[0, 0, 0, 0, 0],
]
)
self.gui.submit_fit_array(data)
print(f"fit submited with {data}")
print("todo mirko: submitted data is 1000 fold in amplitude")
# self.flomni.flomnigui_show_xeyealign_fittab()

View File

@@ -1,7 +1,5 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
@@ -21,7 +19,7 @@ class OMNYGuiTools:
def __init__(self, client):
self.gui = getattr(client, "gui", None)
self.gui_window = self.gui.windows['main'].widget
self.gui_window = self.gui.windows["main"].widget
self.fig200 = None
self.fig201 = None
self.fig202 = None
@@ -139,7 +137,9 @@ class OMNYGuiTools:
if self.progressbar is None:
self.omnygui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui_window.add_dock(name="progress").add_widget("RingProgressBar")
self.progressbar = self.gui_window.add_dock(name="progress").add_widget(
"RingProgressBar"
)
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value

View File

@@ -13,69 +13,10 @@ logger = bec_logger.logger
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye",
}
class OmnyAlignment(RPCBase):
@property
@rpc_call
def enable_live_view(self):
"""
None
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
None
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""
class XRayEye(RPCBase):
@rpc_call
def active_roi(self) -> "BaseROI | None":
@@ -83,20 +24,6 @@ class XRayEye(RPCBase):
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):
@@ -111,6 +38,30 @@ class XRayEye(RPCBase):
None
"""
@rpc_call
def on_live_view_enabled(self, enabled: "bool"):
"""
None
"""
@rpc_call
def on_motors_enable(self, x_enable: "bool", y_enable: "bool"):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
@rpc_call
def enable_submit_button(self, enable: "bool"):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
@property
@rpc_call
def sample_name(self):
@@ -139,6 +90,18 @@ class XRayEye(RPCBase):
None
"""
@rpc_call
def switch_tab(self, tab: "str"):
"""
None
"""
@rpc_call
def submit_fit_array(self, fit_array):
"""
None
"""
class XRayEye2DControl(RPCBase):
@rpc_call
@@ -146,3 +109,15 @@ class XRayEye2DControl(RPCBase):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""

View File

@@ -1,140 +0,0 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
return
image.disconnect_monitor("cam_xeye")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['omny_alignment.py']}

View File

@@ -1,125 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -5,6 +5,7 @@ from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
@@ -21,7 +22,10 @@ from qtpy.QtWidgets import (
QToolButton,
QVBoxLayout,
QWidget,
QTextEdit,
QTabWidget,
)
import time
logger = bec_logger.logger
CAMERA = ("cam_xeye", "image")
@@ -33,46 +37,48 @@ class XRayEye2DControl(BECWidget, QWidget):
self.get_bec_shortcuts()
self._step_size = step_size
self.root_layout = QGridLayout(self)
self.setStyleSheet("""
self.setStyleSheet(
"""
QToolButton {
border: 1px solid;
border-radius: 4px;
}
""")
"""
)
# Up
self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.move_up_button.setIcon(material_icon("keyboard_double_arrow_up"))
self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button
self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.move_up_tweak_button.setIcon(material_icon("keyboard_arrow_up"))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left
self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.move_left_button.setIcon(material_icon("keyboard_double_arrow_left"))
self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button
self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.move_left_tweak_button.setIcon(material_icon("keyboard_arrow_left"))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right
self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.move_right_button.setIcon(material_icon("keyboard_double_arrow_right"))
self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button
self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.move_right_tweak_button.setIcon(material_icon("keyboard_arrow_right"))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down
self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.move_down_button.setIcon(material_icon("keyboard_double_arrow_down"))
self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button
self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.move_down_tweak_button.setIcon(material_icon("keyboard_arrow_down"))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections
@@ -124,8 +130,20 @@ class XRayEye2DControl(BECWidget, QWidget):
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
USER_ACCESS = [
"active_roi",
"user_message",
"user_message.setter",
"on_live_view_enabled",
"on_motors_enable",
"enable_submit_button",
"sample_name",
"sample_name.setter",
"enable_move_buttons",
"enable_move_buttons.setter",
"switch_tab",
"submit_fit_array",
]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
@@ -136,28 +154,44 @@ class XRayEye(BECWidget, QWidget):
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.bec_dispatcher.connect_slot(
self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh")
)
self.bec_dispatcher.connect_slot(
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
)
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self):
self.core_layout = QHBoxLayout(self)
self.root_layout = QVBoxLayout(self)
self.tab_widget = QTabWidget(parent=self)
self.root_layout.addWidget(self.tab_widget)
self.image = Image(parent=self)
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.alignment_tab = QWidget(parent=self)
self.core_layout = QHBoxLayout(self.alignment_tab)
self.image = Image(parent=self.alignment_tab)
self.image.color_map = "CET-L2"
self.image.enable_toolbar = (
False # Disable default toolbar to not allow to user set anything
)
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
self.image.enable_full_colorbar = True
self.image.invert_y = True # Invert y axis to match image coordinates
# Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self)
self.control_panel = QWidget(parent=self.alignment_tab)
self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
compact_orientation="horizontal")
self.roi_manager = ROIPropertyTree(
parent=self, image_widget=self.image, compact=True, compact_orientation="horizontal"
)
header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8)
@@ -166,16 +200,36 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_label, 0, Qt.AlignmentFlag.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(header_row)
switch_row = QHBoxLayout()
switch_row.setContentsMargins(0, 0, 0, 0)
switch_row.setSpacing(8)
switch_row.addStretch()
self.camera_running_label = QLabel("Camera running", parent=self)
self.camera_running_toggle = ToggleSwitch(parent=self)
# self.camera_running_toggle.checked = False
self.camera_running_toggle.enabled.connect(self.camera_running_enabled)
self.shutter_label = QLabel("Shutter open", parent=self)
self.shutter_toggle = ToggleSwitch(parent=self)
# self.shutter_toggle.checked = False
self.shutter_toggle.enabled.connect(self.opening_shutter)
switch_row.addWidget(self.shutter_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.shutter_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(switch_row)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
self.control_panel_layout.addWidget(
self.motor_control_2d, 0, Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignCenter
)
# separator
self.control_panel_layout.addWidget(self._create_separator())
@@ -190,9 +244,8 @@ class XRayEye(BECWidget, QWidget):
# Submit button
self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
step_size_form.addWidget(QLabel("Step Size", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel
@@ -207,7 +260,8 @@ class XRayEye(BECWidget, QWidget):
self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QLineEdit(parent=self)
self.message_line_edit = QTextEdit(parent=self)
self.message_line_edit.setFixedHeight(60)
self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1)
@@ -217,12 +271,46 @@ class XRayEye(BECWidget, QWidget):
self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
self.control_panel.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding)
# Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel)
self.tab_widget.addTab(self.alignment_tab, "Alignment")
self.fit_tab = QWidget(parent=self)
self.fit_layout = QVBoxLayout(self.fit_tab)
self.waveform_x = Waveform(parent=self.fit_tab)
self.waveform_y = Waveform(parent=self.fit_tab)
self.waveform_x.plot(
x=[0],
y=[1],
label="fit-x",
dap="SineModel",
dap_parameters={
"frequency": {"value": 0.0174533, "vary": False, "min": 0.01, "max": 0.02}
},
dap_oversample=5,
)
self.waveform_y.plot(x=[0], y=[2], label="fit-y", dap="SineModel") # ,dap_oversample=5)
self.fit_x = self.waveform_x.curves[0]
self.fit_y = self.waveform_y.curves[0]
self.waveform_x.dap_params_update.connect(self.on_dap_params)
self.waveform_y.dap_params_update.connect(self.on_dap_params)
for wave in (self.waveform_x, self.waveform_y):
wave.x_label = "Angle (deg)"
wave.x_grid = True
wave.y_grid = True
wave.enable_toolbar = True
self.fit_layout.addWidget(self.waveform_x)
self.fit_layout.addWidget(self.waveform_y)
self.tab_widget.addTab(self.fit_tab, "Fit")
def _make_connections(self):
# Fetch initial state
self.on_live_view_enabled(True)
@@ -230,31 +318,36 @@ class XRayEye(BECWidget, QWidget):
# Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
self.step_size.valueChanged.connect(
lambda x: self.motor_control_2d.setProperty("step_size", x)
)
self.submit_button.clicked.connect(self.submit)
def _create_separator(self):
sep = QFrame(parent=self)
sep.setFrameShape(QFrame.HLine)
sep.setFrameShadow(QFrame.Sunken)
sep.setFrameShape(QFrame.Shape.HLine)
sep.setFrameShadow(QFrame.Shadow.Sunken)
sep.setLineWidth(1)
return sep
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
self.dev.omnyfsh.read()
################################################################################
# Device Connection logic
################################################################################
def connect_motors(self):
""" Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
"""Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ["osamroy", "lsamrot", "fsamroy"]
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
logger.info(f"Succesfully connected to {motor}")
self.bec_dispatcher.connect_slot(
self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor)
)
logger.info(f"Successfully connected to {motor}")
################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed
@@ -291,6 +384,13 @@ class XRayEye(BECWidget, QWidget):
# Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeSlot(str)
def switch_tab(self, tab: str):
if tab == "fit":
self.tab_widget.setCurrentIndex(1)
else:
self.tab_widget.setCurrentIndex(0)
@SafeSlot()
def get_roi_coordinates(self) -> dict | None:
"""Get the coordinates of the currently active ROI."""
@@ -307,14 +407,48 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_toggle.blockSignals(True)
if enabled:
self.live_preview_toggle.checked = enabled
self.image.image(CAMERA)
self.image.image(device=CAMERA[0], signal=CAMERA[1])
self.live_preview_toggle.blockSignals(False)
return
self.image.disconnect_monitor(CAMERA)
self.image.disconnect_monitor(CAMERA[0], CAMERA[1])
self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool)
def camera_running_enabled(self, enabled: bool):
logger.info(f"Camera running: {enabled}")
self.camera_running_toggle.blockSignals(True)
self.dev.get(CAMERA[0]).live_mode_enabled.put(enabled)
self.camera_running_toggle.checked = enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_camera_status(self, data, meta):
print(f"msg:{data}")
live_mode_enabled = data.get("signals").get(f"{CAMERA[0]}_live_mode_enabled").get("value")
self.camera_running_toggle.blockSignals(True)
self.camera_running_toggle.checked = live_mode_enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(bool)
def opening_shutter(self, enabled: bool):
logger.info(f"Shutter changed from GUI to: {enabled}")
self.shutter_toggle.blockSignals(True)
if enabled:
self.dev.omnyfsh.fshopen()
else:
self.dev.omnyfsh.fshclose()
# self.shutter_toggle.checked = enabled
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_shutter_status(self, data, meta):
shutter_open = bool(data.get("signals").get("omnyfsh_shutter").get("value"))
self.shutter_toggle.blockSignals(True)
self.shutter_toggle.checked = shutter_open
self.shutter_toggle.blockSignals(False)
@SafeSlot(bool, bool)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
"""
@@ -327,98 +461,108 @@ class XRayEye(BECWidget, QWidget):
self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(int)
def enable_submit_button(self, enable: int):
@SafeSlot(bool)
def enable_submit_button(self, enable: bool):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
if enable == -1:
self.submit_button.setEnabled(False)
else:
if enable:
self.submit_button.setEnabled(True)
else:
self.submit_button.setEnabled(False)
@SafeSlot(dict, dict)
def on_dap_params(self, data, meta):
print("#######################################")
print("getting dap parameters")
print(f"data: {data}")
print(f"meta: {meta}")
self.waveform_x.auto_range(True)
self.waveform_y.auto_range(True)
# self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
curve_id = meta.get("curve_id")
if curve_id == "fit-x-SineModel":
self.dev.omny_xray_gui.fit_params_x.set(data).wait()
print(f"setting x data to {data}")
else:
self.dev.omny_xray_gui.fit_params_y.set(data).wait()
print(f"setting y data to {data}")
# self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
# TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@SafeSlot(dict, dict)
def device_updates(self, data: dict, meta: dict):
"""
Slot to handle device updates from omny_xray_gui device.
Args:
data(dict): data from device
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot()
def submit_fit_array(self, fit_array):
self.tab_widget.setCurrentIndex(1)
# self.fix_x.title = " got fit array"
print(f"got fit array {fit_array}")
self.waveform_x.curves[0].set_data(x=fit_array[0], y=fit_array[1])
# self.fit_x.set_data(x=fit_array[0],y=fit_array[1])
# self.fit_y.set_data(x=fit_array[0],y=fit_array[2])
@SafeSlot()
def submit(self):
"""Execute submit action by submit button."""
print("submit pushed")
self.submit_button.blockSignals(True)
if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI")
return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates['center_y']
roi_center_x = roi_coordinates["center_x"]
roi_center_y = roi_coordinates["center_y"]
# Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates['width']
roi_height = roi_coordinates['height']
roi_width = roi_coordinates["width"]
roi_height = roi_coordinates["height"]
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates['radius']
roi_width = roi_coordinates["diameter"]
roi_height = roi_coordinates["radius"]
else:
logger.warning("Unsupported ROI type for submit action.")
return
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
print(
f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}"
) # TODO remove when will be not needed for debugging
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get("value"))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
xval_x = getattr(self.dev.omny_xray_gui, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
print("submit done")
self.submit_button.blockSignals(False)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
getattr(self.dev,CAMERA[0]).live_mode = False
self.bec_dispatcher.disconnect_slot(
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
)
getattr(self.dev, CAMERA[0]).live_mode = False
super().cleanup()
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv)
apply_theme("light")
dispatcher = BECDispatcher(gui_id="xray")
win = XRayEye()
win.resize(1000, 800)

View File

@@ -0,0 +1,480 @@
############################################################
#################### flOMNI Galil motors ###################
############################################################
feyex:
description: Xray eye X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: D
host: mpc2844.psi.ch
limits:
- -30
- -1
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -16.267
out: -1
feyey:
description: Xray eye Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: E
host: mpc2844.psi.ch
limits:
- -1
- -10
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -10.467
fheater:
description: Heater Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
limits:
- -15
- 0
port: 8082
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
foptx:
description: Optics X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
limits:
- -17
- -12
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -13.761
fopty:
description: Optics Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: F
host: mpc2844.psi.ch
limits:
- 0
- 4
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0.552
out: 0.752
foptz:
description: Optics Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- 0
- 27
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 23
fsamroy:
description: Sample rotation
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- -5
- 365
port: 8084
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
fsamx:
description: Sample coarse X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: E
host: mpc2844.psi.ch
limits:
- -162
- 0
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: -1.1
fsamy:
description: Sample coarse Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: F
host: mpc2844.psi.ch
limits:
- 2
- 3.1
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: true
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 2.75
ftracky:
description: Laser Tracker coarse Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: H
host: mpc2844.psi.ch
limits:
- 2.2
- 2.8
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftrackz:
description: Laser Tracker coarse Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: G
host: mpc2844.psi.ch
limits:
- 4.5
- 5.5
port: 8082
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftransx:
description: Sample transer X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
limits:
- 0
- 50
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftransy:
description: Sample transer Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- -100
- 0
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
sensor_voltage: -2.4
ftransz:
description: Sample transer Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
limits:
- 0
- 145
port: 8081
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
ftray:
description: Sample transfer tray
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: D
host: mpc2844.psi.ch
limits:
- -200
- 0
port: 8081
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
############################################################
#################### flOMNI Sample Names ###################
############################################################
flomni_samples:
description: Sample names and storage
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimSampleStorage
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
#################### flOMNI Smaract motors #################
############################################################
fosax:
description: OSA X
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
limits:
- 10.2
- 10.6
port: 3334
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 9.124
out: 5.3
fosay:
description: OSA Y
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
limits:
- -3.1
- -2.9
port: 3334
sign: -1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 0.367
fosaz:
description: OSA Z
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
limits:
- -6
- -4
port: 3334
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
in: 8.5
out: 6
############################################################
#################### flOMNI RT motors ######################
############################################################
rtx:
description: flomni rt
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: A
host: mpc2844.psi.ch
port: 2222
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
low_signal: 10000
min_signal: 9000
rt_pid_voltage: -0.06219
rty:
description: flomni rt
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: B
host: mpc2844.psi.ch
port: 2222
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
userParameter:
tomo_additional_offsety: 0
rtz:
description: flomni rt
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimPositioner
deviceConfig:
axis_Id: C
host: mpc2844.psi.ch
port: 2222
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
connectionTimeout: 20
############################################################
####################### Cameras ############################
############################################################
cam_flomni_gripper:
description: Camera sample changer
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
deviceConfig:
url: http://flomnicamserver:5000/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_flomni_overview:
description: Camera flomni overview
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
deviceConfig:
url: http://flomnicamserver:5001/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_xeye:
description: Camera flOMNI Xray eye ID1
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
deviceConfig:
camera_id: 1
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
# cam_ids_rgb:
# description: Camera flOMNI Xray eye ID203
# deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimCamera
# deviceConfig:
# camera_id: 203
# bits_per_pixel: 24
# num_rotation_90: 2
# transpose: false
# force_monochrome: false
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: async
# ############################################################
# ################### flOMNI temperatures ####################
# ############################################################
flomni_temphum:
description: flOMNI Temperatures and humidity
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimTempHum
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
# ############################################################
# ########## OMNY / flOMNI / LamNI fast shutter ##############
# ############################################################
omnyfsh:
description: omnyfsh connects to fast shutter at X12 if device fsh exists
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimFastShutter
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui signals
deviceClass: csaxs_bec.devices.sim.flomni_sim.FlomniSimXRayAlignGUI
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request

View File

@@ -435,9 +435,9 @@ cam_xeye:
# deviceConfig:
# camera_id: 203
# bits_per_pixel: 24
# num_rotation_90: 3
# num_rotation_90: 2
# transpose: false
# force_monochrome: true
# force_monochrome: false
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
@@ -472,7 +472,7 @@ omnyfsh:
############################################################
omny_xray_gui:
description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayAlignGUI
deviceConfig: {}
enabled: true
onFailure: buffer

View File

@@ -0,0 +1,149 @@
"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK."""
import threading
import traceback
from enum import IntEnum
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt, Kind, Signal
from ophyd.areadetector import ADComponent as ADCpt
from ophyd.areadetector import DetectorBase
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
logger = bec_logger.logger
class ACQUIRE_MODES(IntEnum):
"""Acquiring enums for Allied Vision Camera"""
ACQUIRING = 1
DONE = 0
class AlliedVisionCamera(PSIDeviceBase, DetectorBase):
"""
Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK.
The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'.
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
cam = ADCpt(VimbaDetectorCam, "cam1:")
image = ADCpt(ImagePlugin, "image1:")
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0,
doc="Preview signal of the AlliedVision camera.",
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(
self,
*,
name: str,
prefix: str,
poll_rate: int = 5,
scan_info=None,
device_manager=None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_start_event = threading.Event()
if poll_rate > 10:
logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.")
poll_rate = 10
self._poll_rate = poll_rate
self._unique_array_id = 0
self._pv_timeout = 2.0
self.image: ImagePlugin
self._live_mode_lock = threading.RLock()
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
"""Start live mode."""
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
"""Stop live mode."""
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if not self._poll_start_event.is_set():
self._poll_start_event.set()
self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition
else:
logger.info(f"Live mode already started for {self.name}.")
return
if self._poll_start_event.is_set():
self._poll_start_event.clear()
self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition
else:
logger.info(f"Live mode already stopped for {self.name}.")
def on_connected(self):
"""Reset the unique array ID on connection."""
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
self._poll_thread.start()
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
while self._poll_start_event.wait():
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
array_size = self.image.array_size.get()
if array_size[0] == 0: # 2D image, not color image
array_size = array_size[1:]
# Geometry correction for the image
data = np.reshape(value, array_size)
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
def on_destroy(self):
"""Stop the polling thread on destruction."""
self._poll_thread_kill_event.set()
self._poll_start_event.set()
if self._poll_thread.is_alive():
self._poll_thread.join(timeout=2)

View File

@@ -156,7 +156,6 @@ class Camera:
camera_id (int): The ID of the camera device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
def __init__(

View File

@@ -3,21 +3,19 @@
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
from typing import TYPE_CHECKING, Literal
import numpy as np
from ophyd import Component as Cpt, Signal, Kind
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
@@ -45,8 +43,15 @@ class IDSCamera(PSIDeviceBase):
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
USER_ACCESS = ["start_live_mode", "stop_live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__(
self,
@@ -83,15 +88,22 @@ class IDSCamera(PSIDeviceBase):
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose
self._force_monochrome = force_monochrome
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
self.live_mode_enabled.put(bool(live_mode))
############## Live Mode Methods ##############
def start_live_mode(self) -> None:
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
@@ -114,22 +126,15 @@ class IDSCamera(PSIDeviceBase):
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def _on_live_mode_enabled_changed(self, *args, value, **kwargs):
"""Callback for when live mode is changed."""
enabled = bool(value)
if enabled and self.cam._connected is False: # pylint: disable=protected-access
self.cam.on_connect()
if enabled:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
@@ -196,7 +201,7 @@ class IDSCamera(PSIDeviceBase):
"""Connect to the camera."""
self.cam.force_monochrome = self._force_monochrome
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.live_mode_enabled.put(bool(self._inputs.get("live_mode", False)))
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
@@ -206,7 +211,7 @@ class IDSCamera(PSIDeviceBase):
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
if not bool(self.live_mode_enabled.get()):
return
image = self.image.get()
if image is not None:

View File

@@ -2,7 +2,7 @@ import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt
from ophyd import Device, Component as Cpt, Kind, Signal
from ophyd_devices import PreviewSignal
import traceback
@@ -13,6 +13,13 @@ logger = bec_logger.logger
class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs)
@@ -21,20 +28,54 @@ class WebcamViewer(Device):
self._update_thread = None
self._buffer = b""
self._shutdown_event = threading.Event()
self._live_mode_lock = threading.RLock()
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
if self._connection is not None:
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if self._update_thread is not None and self._update_thread.is_alive():
return
self._shutdown_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
return
if self._update_thread is None:
return
self._shutdown_event.set()
if self._connection is not None:
try:
self._connection.close()
except Exception: # pylint: disable=broad-except
pass
self._connection = None
self._update_thread.join(timeout=2)
if self._update_thread.is_alive():
logger.warning("Webcam live mode thread did not stop within timeout.")
return
self._update_thread = None
self._buffer = b""
self._shutdown_event.clear()
def _update_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
self._connection = requests.get(self.url, stream=True)
self._connection = requests.get(self.url, stream=True, timeout=5)
for chunk in self._connection.iter_content(chunk_size=1024):
if self._shutdown_event.is_set():
break
self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end
@@ -50,16 +91,3 @@ class WebcamViewer(Device):
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -1,74 +1,46 @@
from ophyd import Component as Cpt
import numpy as np
from ophyd import Component as Cpt, Signal, EpicsSignal
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayAlignGUI(Device):
class OMNYXRayEpicsGUI(Device):
save_frame = Cpt(
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
)
update_frame_acqdone = Cpt(
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
update_frame_acqdone = Cpt(Signal, value=0)
update_frame_acq = Cpt(Signal, value=0)
enable_mv_x = Cpt(Signal, value=0)
enable_mv_y = Cpt(Signal, value=0)
send_message = Cpt(Signal, value=0)
sample_name = Cpt(Signal, value=0)
angle = Cpt(Signal, value=0)
pixel_size = Cpt(Signal, value=0)
submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
)
step = Cpt(
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
)
xval_x_dynamic = {
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)
)
step = Cpt(Signal, value=0)
recbg = Cpt(Signal, value=0)
mvx = Cpt(Signal, value=0)
mvy = Cpt(Signal, value=0)
fit_array = Cpt(Signal, value=np.zeros((3, 10)))
fit_params_x = Cpt(Signal, value=np.zeros((2, 3)))
fit_params_y = Cpt(Signal, value=np.zeros((2, 3)))
# Generate width_y_0 to width_y_10
for i in range(11):
locals()[f'width_y_{i}'] = Cpt(Signal, value=0)
# Generate width_x_0 to width_x_10
for i in range(11):
locals()[f'width_x_{i}'] = Cpt(Signal, value=0)
# Generate xval_x_0 to xval_x_10
for i in range(11):
locals()[f'xval_x_{i}'] = Cpt(Signal, value=0)
# Generate yval_y_0 to yval_y_10
for i in range(11):
locals()[f'yval_y_{i}'] = Cpt(Signal, value=0)
# Generate stage_pos_x_1 to stage_pos_x_5
for i in range(1, 6):
locals()[f'stage_pos_x_{i}'] = Cpt(Signal, value=0)

View File

@@ -0,0 +1,2 @@
"""Simulation devices for local csaxs_bec development."""

View File

@@ -0,0 +1,474 @@
from __future__ import annotations
import json
import threading
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device, Kind, Signal
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd_devices import SimPositioner
from ophyd_devices.utils.bec_signals import PreviewSignal
from prettytable import PrettyTable
class FlomniSimController:
"""Controller shim that exposes FLOMNI-specific controller APIs for local simulation."""
USER_ACCESS = [
"socket_put_and_receive",
"socket_put_confirmed",
"drive_axis_to_limit",
"get_motor_limit_switch",
"all_axes_referenced",
"axis_is_referenced",
"is_motor_on",
"feedback_disable",
"feedback_enable_with_reset",
"feedback_enable_without_reset",
"feedback_is_running",
"laser_tracker_on",
"laser_tracker_off",
"laser_tracker_show_all",
"laser_tracker_check_signalstrength",
"laser_tracker_check_enabled",
"show_signal_strength_interferometer",
"read_ssi_interferometer",
"show_cyclic_error_compensation",
"get_pid_x",
"set_rotation_angle",
"fosaz_light_curtain_is_triggered",
"lights_off",
"lights_on",
"galil_show_all",
"move_open_loop_steps",
"find_reference_mark",
]
def __init__(self, device: FlomniSimPositioner):
self.device = device
self.connected = True
self._feedback_running = False
self._laser_tracker_enabled = False
self._lights_on = True
self._mount_mode = False
self._confirm = 0
self._pid_x = -0.06
self._rt_angle = 0.0
self._sensor_voltage = -2.4
self.tracker_info = {
"enabled_y": True,
"enabled_z": True,
"tracker_intensity": 15000,
"threshold_intensity_y": 9000,
"threshold_intensity_z": 9000,
}
def socket_put_and_receive(self, cmd: str) -> str:
cmd_norm = cmd.strip().lower()
if "mg mntmod" in cmd_norm:
return "1" if self._mount_mode else "0"
if "mg mntprgs" in cmd_norm:
return "0"
if "mg confirm" in cmd_norm:
return str(self._confirm)
if "mg @out[9]" in cmd_norm:
return "0"
if "mg@an[1]" in cmd_norm:
return str(self._sensor_voltage)
if "mg @in[14]" in cmd_norm:
return "0"
if "axisref" in cmd_norm:
return "1"
if cmd_norm == "g":
return str(self._pid_x)
if cmd_norm == "o":
return "1"
if cmd_norm.startswith("tp") or cmd_norm.startswith("td"):
return str(float(self.device.readback.get()))
if cmd_norm.startswith("mg_bg"):
return "0"
return "0"
def socket_put_confirmed(self, cmd: str) -> None:
cmd_norm = cmd.strip().lower()
if "xq#mntmode" in cmd_norm:
self._mount_mode = True
elif "xq#posmode" in cmd_norm:
self._mount_mode = False
elif cmd_norm.startswith("confirm="):
try:
self._confirm = int(float(cmd_norm.split("=")[1]))
except ValueError:
self._confirm = 0
elif cmd_norm == "l0":
self._feedback_running = False
elif cmd_norm in ("l1", "l3"):
self._feedback_running = True
elif cmd_norm == "t0":
self._laser_tracker_enabled = False
elif cmd_norm == "t1":
self._laser_tracker_enabled = True
elif cmd_norm == "cb15":
self._lights_on = False
elif cmd_norm == "sb15":
self._lights_on = True
def drive_axis_to_limit(self, axis_Id_numeric=None, direction: str = "forward") -> None:
low, high = self.device.limits
if low >= high:
return
target = high if direction == "forward" else low
status = self.device.move(target)
status.wait(timeout=2)
def get_motor_limit_switch(self, axis_id=None) -> tuple[bool, bool]:
low, high = self.device.limits
if low >= high:
return (False, False)
pos = float(self.device.readback.get())
eps = max(self.device.tolerance.get(), 0.05)
return (abs(pos - low) <= eps, abs(pos - high) <= eps)
def all_axes_referenced(self) -> bool:
return True
def axis_is_referenced(self, axis_id_numeric=None) -> bool:
return True
def is_motor_on(self, axis_id=None) -> bool:
return True
def feedback_disable(self) -> None:
self._feedback_running = False
def feedback_enable_with_reset(self) -> None:
self._feedback_running = True
def feedback_enable_without_reset(self) -> None:
self._feedback_running = True
def feedback_is_running(self) -> bool:
return self._feedback_running
def laser_tracker_on(self) -> None:
self._laser_tracker_enabled = True
def laser_tracker_off(self) -> None:
self._laser_tracker_enabled = False
def laser_tracker_show_all(self) -> None:
t = PrettyTable()
t.title = "Simulated Laser Tracker"
t.field_names = ["name", "value"]
for key, val in self.tracker_info.items():
t.add_row([key, val])
print(t)
def laser_tracker_check_signalstrength(self) -> bool:
return True
def laser_tracker_check_enabled(self) -> bool:
return self._laser_tracker_enabled
def show_signal_strength_interferometer(self) -> None:
print("Simulated interferometer signal strength: OK")
def read_ssi_interferometer(self) -> float:
return 12000.0
def show_cyclic_error_compensation(self) -> None:
print("Simulated cyclic error compensation: initialized")
def get_pid_x(self) -> float:
return self._pid_x
def set_rotation_angle(self, val: float) -> None:
self._rt_angle = float(val)
def fosaz_light_curtain_is_triggered(self) -> bool:
return False
def lights_off(self) -> None:
self._lights_on = False
def lights_on(self) -> None:
self._lights_on = True
def galil_show_all(self) -> None:
print(f"Simulated controller for {self.device.name} connected: {self.connected}")
def move_open_loop_steps(self, axis_id_numeric: int, steps: int, **_kwargs) -> None:
# no-op in simulation
_ = axis_id_numeric, steps
def find_reference_mark(self, axis_id_numeric: int, *_args) -> None:
# no-op in simulation
_ = axis_id_numeric
class JSONSafeSignal(Signal):
"""Signal that coerces complex Python objects to JSON strings for ophyd describe()."""
def _coerce(self, value):
if isinstance(value, np.ndarray):
value = value.tolist()
if isinstance(value, (dict, list, tuple)):
return json.dumps(value)
return value
def put(self, value, *args, **kwargs):
return super().put(self._coerce(value), *args, **kwargs)
def set(self, value, *args, **kwargs):
return super().set(self._coerce(value), *args, **kwargs)
class FlomniSimPositioner(SimPositioner):
"""Simulated positioner that preserves FLOMNI controller-style API."""
USER_ACCESS = ["sim", "readback", "registered_proxies", "controller"]
def __init__(
self,
name: str,
*,
axis_Id: str | None = None,
host: str | None = None,
port: int | None = None,
sign: int = 1,
limits: list[float] | tuple[float, float] | None = None,
**kwargs,
):
super().__init__(name=name, limits=limits, **kwargs)
self.axis_Id = axis_Id
self.host = host
self.port = port
self.sign = sign
self.controller = FlomniSimController(self)
@property
def axis_Id_numeric(self) -> int | None:
if not self.axis_Id:
return None
return ord(self.axis_Id.lower()) - 97
class FlomniSimCamera(Device):
"""Lightweight camera simulation with live mode controls for FLOMNI/OMNY GUI usage."""
USER_ACCESS = ["start_live_mode", "stop_live_mode", "live_mode_enabled", "image", "preview"]
image = Cpt(PreviewSignal, name="image", ndim=2, num_rotation_90=0, transpose=False)
preview = Cpt(PreviewSignal, name="preview", ndim=2, num_rotation_90=0, transpose=False)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
kind=Kind.config,
)
def __init__(
self,
name: str,
*,
camera_id: int | None = None,
camera_ID: int | None = None,
url: str | None = None,
bits_per_pixel: int = 8,
channels: int = 1,
m_n_colormode: int = 0,
num_rotation_90: int = 0,
transpose: bool = False,
force_monochrome: bool = False,
**kwargs,
):
super().__init__(name=name, **kwargs)
self.camera_id = camera_id if camera_id is not None else camera_ID
self.url = url
self.bits_per_pixel = bits_per_pixel
self.channels = channels
self.m_n_colormode = m_n_colormode
self.force_monochrome = force_monochrome
self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
self._frame_number = 0
self._shutdown_event = threading.Event()
self._live_mode_thread: threading.Thread | None = None
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs):
_ = args, kwargs
if bool(value):
self._start_live_mode()
else:
self._stop_live_mode()
def _start_live_mode(self) -> None:
if self._live_mode_thread and self._live_mode_thread.is_alive():
return
self._shutdown_event.clear()
self._live_mode_thread = threading.Thread(target=self._live_mode_loop, daemon=True)
self._live_mode_thread.start()
def _stop_live_mode(self) -> None:
if self._live_mode_thread is None:
return
self._shutdown_event.set()
self._live_mode_thread.join(timeout=1)
self._live_mode_thread = None
self._shutdown_event.clear()
def _live_mode_loop(self) -> None:
while not self._shutdown_event.is_set():
frame = self._generate_frame()
self.image.put(frame)
self.preview.put(frame)
self._shutdown_event.wait(0.2)
def _generate_frame(self) -> np.ndarray:
self._frame_number += 1
x = np.linspace(0, 255, 320, dtype=np.uint8)
y = np.linspace(0, 255, 240, dtype=np.uint8)[:, None]
frame = ((x + y + self._frame_number) % 255).astype(np.uint8)
return frame
def destroy(self):
self._stop_live_mode()
return super().destroy()
class FlomniSimFastShutter(Device):
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "fshstatus_readback"]
shutter = Cpt(Signal, name="shutter", value=0)
def fshopen(self):
self.shutter.put(1)
def fshclose(self):
self.shutter.put(0)
def fshstatus(self):
return int(self.shutter.get())
def fshstatus_readback(self):
return int(self.shutter.get())
def fshinfo(self):
print("Using simulated fast shutter.")
class FlomniSimXRayAlignGUI(Device):
"""Signal-only X-ray alignment GUI backend for simulation."""
update_frame_acqdone = Cpt(Signal, value=0)
update_frame_acq = Cpt(Signal, value=0)
enable_mv_x = Cpt(Signal, value=0)
enable_mv_y = Cpt(Signal, value=0)
send_message = Cpt(Signal, value="")
sample_name = Cpt(Signal, value="")
angle = Cpt(Signal, value=0)
pixel_size = Cpt(Signal, value=0)
submit = Cpt(Signal, name="submit", value=0)
step = Cpt(Signal, value=0)
recbg = Cpt(Signal, value=0)
mvx = Cpt(Signal, value=0)
mvy = Cpt(Signal, value=0)
fit_array = Cpt(Signal, value=np.zeros((3, 10)))
fit_params_x = Cpt(JSONSafeSignal, value="{}")
fit_params_y = Cpt(JSONSafeSignal, value="{}")
for i in range(11):
locals()[f"width_y_{i}"] = Cpt(Signal, value=0)
for i in range(11):
locals()[f"width_x_{i}"] = Cpt(Signal, value=0)
for i in range(11):
locals()[f"xval_x_{i}"] = Cpt(Signal, value=0)
for i in range(11):
locals()[f"yval_y_{i}"] = Cpt(Signal, value=0)
for i in range(1, 6):
locals()[f"stage_pos_x_{i}"] = Cpt(Signal, value=0)
class FlomniSimSampleStorage(Device):
USER_ACCESS = [
"is_sample_slot_used",
"is_sample_in_gripper",
"set_sample_slot",
"unset_sample_slot",
"set_sample_in_gripper",
"unset_sample_in_gripper",
"get_sample_name",
"show_all",
]
sample_placed = Dcpt({f"sample{i}": (Signal, None, {"value": 0}) for i in range(21)})
sample_names = Dcpt({f"sample{i}": (Signal, None, {"value": "-"}) for i in range(21)})
sample_in_gripper = Cpt(Signal, name="sample_in_gripper", value=0)
sample_in_gripper_name = Cpt(Signal, name="sample_in_gripper_name", value="-")
def set_sample_slot(self, slot_nr: int, name: str) -> None:
getattr(self.sample_placed, f"sample{slot_nr}").set(1)
getattr(self.sample_names, f"sample{slot_nr}").set(name)
def unset_sample_slot(self, slot_nr: int) -> None:
getattr(self.sample_placed, f"sample{slot_nr}").set(0)
getattr(self.sample_names, f"sample{slot_nr}").set("-")
def set_sample_in_gripper(self, name: str) -> None:
self.sample_in_gripper.set(1)
self.sample_in_gripper_name.set(name)
def unset_sample_in_gripper(self) -> None:
self.sample_in_gripper.set(0)
self.sample_in_gripper_name.set("-")
def is_sample_slot_used(self, slot_nr: int) -> bool:
return bool(getattr(self.sample_placed, f"sample{slot_nr}").get())
def is_sample_in_gripper(self) -> bool:
return bool(self.sample_in_gripper.get())
def get_sample_name(self, slot_nr: int) -> str:
return str(getattr(self.sample_names, f"sample{slot_nr}").get())
def show_all(self) -> None:
t = PrettyTable()
t.title = "flOMNI sample storage (sim)"
t.field_names = ["slot", "used", "name"]
for slot in range(21):
t.add_row([slot, int(self.is_sample_slot_used(slot)), self.get_sample_name(slot)])
print(t)
class FlomniSimTempHum(Device):
USER_ACCESS = ["show_all", "help"]
temperature_mirror = Cpt(Signal, value=22.0)
temperature_osa = Cpt(Signal, value=22.0)
temperature_heater = Cpt(Signal, value=23.0)
humidity_sensor1 = Cpt(Signal, value=35.0)
humidity_sensor2 = Cpt(Signal, value=34.0)
flow = Cpt(Signal, value=2.0)
suction = Cpt(Signal, value=1.0)
def show_all(self):
print("=== flOMNI Temperature & Humidity (sim) ===")
print(f"Mirror temperature: {self.temperature_mirror.get():.1f} C")
print(f"OSA temperature: {self.temperature_osa.get():.1f} C")
print(f"Heater temperature: {self.temperature_heater.get():.1f} C")
print(f"Humidity sensor 1: {self.humidity_sensor1.get():.1f} %RH")
print(f"Humidity sensor 2: {self.humidity_sensor2.get():.1f} %RH")
print(f"Flow: {self.flow.get():.1f} sccm")
print(f"Suction: {self.suction.get():.1f}")
def help(self):
print("show_all() - display current simulated values")