Compare commits
18 Commits
flomni_hea
...
fix/remove
| Author | SHA1 | Date | |
|---|---|---|---|
| 90d2c99c4a | |||
| 6a4bfc73f6 | |||
|
|
22d8dbe972 | ||
|
|
2411e7be56 | ||
|
f8b20752f5
|
|||
|
e301b94e7c
|
|||
|
|
61011f098d | ||
|
|
efca170f04 | ||
|
|
af2a69f825 | ||
| c8c71d466c | |||
| 94d984b8a2 | |||
|
|
39d2c97247 | ||
|
|
de22611941 | ||
| 4723f6768b | |||
|
|
5b76c3f769 | ||
| 4590b85010 | |||
| 7233fb8d35 | |||
| 32d3232008 |
@@ -468,7 +468,7 @@ class FlomniSampleTransferMixin:
|
||||
def ftransfer_flomni_stage_out(self):
|
||||
|
||||
self.flomnigui_show_cameras()
|
||||
|
||||
self.flomnigui_raise()
|
||||
|
||||
target_pos = -162
|
||||
if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01):
|
||||
@@ -924,7 +924,20 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
|
||||
class FlomniAlignmentMixin:
|
||||
default_correction_file = "correction_flomni_20210300_360deg.txt"
|
||||
import csaxs_bec
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure this is a Path object, not a string
|
||||
csaxs_bec_basepath = Path(csaxs_bec.__file__)
|
||||
|
||||
default_correction_file_rel = "correction_flomni_20210300_360deg.txt"
|
||||
|
||||
# Build the absolute path correctly
|
||||
default_correction_file = (
|
||||
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
|
||||
).resolve()
|
||||
|
||||
|
||||
def reset_correction(self, use_default_correction=True):
|
||||
"""
|
||||
@@ -1172,12 +1185,12 @@ class Flomni(
|
||||
self.align = XrayEyeAlign(self.client, self)
|
||||
self.set_client(client)
|
||||
|
||||
def start_x_ray_eye_alignment(self):
|
||||
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
|
||||
|
||||
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
|
||||
self.align = XrayEyeAlign(self.client, self)
|
||||
try:
|
||||
self.align.align()
|
||||
self.align.align(keep_shutter_open)
|
||||
except KeyboardInterrupt as exc:
|
||||
fsamx_in = self._get_user_param_safe(dev.fsamx, "in")
|
||||
if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5):
|
||||
@@ -1186,11 +1199,11 @@ class Flomni(
|
||||
umv(dev.fsamx, fsamx_in)
|
||||
raise exc
|
||||
|
||||
def xrayeye_update_frame(self):
|
||||
self.align.update_frame()
|
||||
def xrayeye_update_frame(self,keep_shutter_open=False):
|
||||
self.align.update_frame(keep_shutter_open)
|
||||
|
||||
def xrayeye_alignment_start(self):
|
||||
self.start_x_ray_eye_alignment()
|
||||
def xrayeye_alignment_start(self, keep_shutter_open=False):
|
||||
self.start_x_ray_eye_alignment(keep_shutter_open)
|
||||
|
||||
def drive_axis_to_limit(self, device, direction):
|
||||
axis_id = device._config["deviceConfig"].get("axis_Id")
|
||||
|
||||
@@ -33,7 +33,7 @@ class FlomniOpticsMixin:
|
||||
feyex_in = self._get_user_param_safe("feyex", "in")
|
||||
feyey_in = self._get_user_param_safe("feyey", "in")
|
||||
umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
|
||||
self.align.update_frame()
|
||||
#self.align.update_frame()
|
||||
|
||||
def _ffzp_in(self):
|
||||
foptx_in = self._get_user_param_safe("foptx", "in")
|
||||
|
||||
@@ -18,12 +18,8 @@ class flomniGuiToolsError(Exception):
|
||||
class flomniGuiTools:
|
||||
|
||||
def __init__(self):
|
||||
self.gui_window = None
|
||||
self.camera_gripper_image = None
|
||||
self.camera_overview_image = None
|
||||
self.progressbar = None
|
||||
self.text_box = None
|
||||
self.idle_text_box = None
|
||||
self.progressbar = None
|
||||
|
||||
def set_client(self, client):
|
||||
self.client = client
|
||||
@@ -38,29 +34,58 @@ class flomniGuiTools:
|
||||
def flomnigui_stop_gui(self):
|
||||
self.gui.flomni.hide()
|
||||
|
||||
def flomnigui_raise(self):
|
||||
self.gui.flomni.raise_window()
|
||||
|
||||
# def flomnigui_show_xeyealign(self):
|
||||
# self.flomnigui_show_gui()
|
||||
# if self.xeyegui is None:
|
||||
# self.flomnigui_remove_all_docks()
|
||||
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
|
||||
# # start live
|
||||
# if not dev.cam_xeye.live_mode:
|
||||
# dev.cam_xeye.live_mode = True
|
||||
|
||||
def flomnigui_show_xeyealign(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("xeyegui"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
|
||||
# start live
|
||||
if not dev.cam_xeye.live_mode:
|
||||
dev.cam_xeye.live_mode = True
|
||||
|
||||
|
||||
def _flomnigui_check_attribute_not_exists(self, attribute_name):
|
||||
if hasattr(self.gui,"flomni"):
|
||||
if hasattr(self.gui.flomni,attribute_name):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def flomnigui_show_cameras(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self.camera_gripper_image is None or self.camera_overview_image is None:
|
||||
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
self.camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
|
||||
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
|
||||
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
|
||||
self.camera_gripper_image.image(("cam_flomni_gripper", "preview"))
|
||||
self.camera_gripper_image.lock_aspect_ratio = True
|
||||
self.camera_gripper_image.enable_fps_monitor = True
|
||||
self.camera_gripper_image.enable_toolbar = False
|
||||
self.camera_gripper_image.outer_axes = False
|
||||
self.camera_gripper_image.inner_axes = False
|
||||
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
|
||||
camera_gripper_image.lock_aspect_ratio = True
|
||||
camera_gripper_image.enable_fps_monitor = True
|
||||
camera_gripper_image.enable_toolbar = False
|
||||
camera_gripper_image.outer_axes = False
|
||||
camera_gripper_image.inner_axes = False
|
||||
dev.cam_flomni_gripper.start_live_mode()
|
||||
else:
|
||||
print("Cannot open camera_gripper. Device does not exist.")
|
||||
self.camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
|
||||
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
|
||||
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
|
||||
self.camera_overview_image.image(("cam_flomni_overview", "preview"))
|
||||
self.camera_overview_image.lock_aspect_ratio = True
|
||||
self.camera_overview_image.enable_fps_monitor = True
|
||||
self.camera_overview_image.enable_toolbar = False
|
||||
self.camera_overview_image.outer_axes = False
|
||||
self.camera_overview_image.inner_axes = False
|
||||
camera_overview_image.image(("cam_flomni_overview", "preview"))
|
||||
camera_overview_image.lock_aspect_ratio = True
|
||||
camera_overview_image.enable_fps_monitor = True
|
||||
camera_overview_image.enable_toolbar = False
|
||||
camera_overview_image.outer_axes = False
|
||||
camera_overview_image.inner_axes = False
|
||||
dev.cam_flomni_overview.start_live_mode()
|
||||
else:
|
||||
print("Cannot open camera_overview. Device does not exist.")
|
||||
@@ -68,18 +93,16 @@ class flomniGuiTools:
|
||||
def flomnigui_remove_all_docks(self):
|
||||
dev.cam_flomni_overview.stop_live_mode()
|
||||
dev.cam_flomni_gripper.stop_live_mode()
|
||||
self.gui.flomni.delete_all()
|
||||
self.camera_gripper_image = None
|
||||
self.camera_overview_image = None
|
||||
dev.cam_xeye.live_mode = False
|
||||
self.gui.flomni.delete_all()
|
||||
self.progressbar = None
|
||||
self.text_box = None
|
||||
self.idle_text_box = None
|
||||
|
||||
def flomnigui_idle(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self.idle_text_box is None:
|
||||
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
self.idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
|
||||
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
|
||||
text = (
|
||||
"<pre>"
|
||||
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
|
||||
@@ -89,7 +112,7 @@ class flomniGuiTools:
|
||||
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n"
|
||||
+ "</pre>"
|
||||
)
|
||||
self.idle_text_box.set_html_text(text)
|
||||
idle_text_box.set_html_text(text)
|
||||
|
||||
def _flomnicam_check_device_exists(self, device):
|
||||
try:
|
||||
@@ -101,7 +124,7 @@ class flomniGuiTools:
|
||||
|
||||
def flomnigui_show_progress(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self.progressbar is None:
|
||||
if self._flomnigui_check_attribute_not_exists("progressbar"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
# Add a new dock with a RingProgressBar widget
|
||||
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
|
||||
@@ -132,7 +155,7 @@ class flomniGuiTools:
|
||||
* 100
|
||||
)
|
||||
self.progressbar.set_value([progress, subtomo_progress, 0])
|
||||
|
||||
if self.text_box is not None:
|
||||
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
|
||||
self.text_box.set_plain_text(text)
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
|
||||
|
||||
logger = bec_logger.logger
|
||||
# import builtins to avoid linter errors
|
||||
@@ -22,6 +22,7 @@ if TYPE_CHECKING:
|
||||
|
||||
class XrayEyeAlign:
|
||||
# pixel calibration, multiply to get mm
|
||||
labview=False
|
||||
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
|
||||
|
||||
def __init__(self, client, flomni: Flomni) -> None:
|
||||
@@ -40,28 +41,40 @@ class XrayEyeAlign:
|
||||
def save_frame(self):
|
||||
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
def update_frame(self):
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
# start live
|
||||
def update_frame(self,keep_shutter_open=False):
|
||||
if self.labview:
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
|
||||
if not self.labview:
|
||||
self.flomni.flomnigui_show_xeyealign()
|
||||
if not dev.cam_xeye.live_mode:
|
||||
dev.cam_xeye.live_mode = True
|
||||
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 1)
|
||||
# wait for start live
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
time.sleep(0.5)
|
||||
print("waiting for live view to start...")
|
||||
if self.labview:
|
||||
# wait for start live
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
time.sleep(0.5)
|
||||
print("waiting for live view to start...")
|
||||
|
||||
fshopen()
|
||||
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
if self.labview:
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
print("waiting for new frame...")
|
||||
time.sleep(0.5)
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
print("waiting for new frame...")
|
||||
time.sleep(0.5)
|
||||
|
||||
time.sleep(0.5)
|
||||
# stop live view
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
time.sleep(1)
|
||||
# fshclose
|
||||
print("got new frame")
|
||||
if not keep_shutter_open:
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
time.sleep(0.1)
|
||||
fshclose()
|
||||
print("got new frame")
|
||||
else:
|
||||
print("Staying in live view, shutter is and remains open!")
|
||||
|
||||
def tomo_rotate(self, val: float):
|
||||
# pylint: disable=undefined-variable
|
||||
@@ -87,12 +100,23 @@ class XrayEyeAlign:
|
||||
def send_message(self, msg: str):
|
||||
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
|
||||
|
||||
def align(self):
|
||||
def align(self,keep_shutter_open=False):
|
||||
if not keep_shutter_open:
|
||||
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
|
||||
self.send_message("Getting things ready. Please wait...")
|
||||
|
||||
#potential unresolved movement requests to zero
|
||||
epics_put("XOMNYI-XEYE-MVX:0", 0)
|
||||
epics_put("XOMNYI-XEYE-MVY:0", 0)
|
||||
|
||||
# reset shift xy and fov params
|
||||
self._reset_init_values()
|
||||
|
||||
self.flomni.lights_off()
|
||||
|
||||
self.flomni.flomnigui_show_xeyealign()
|
||||
self.flomni.flomnigui_raise()
|
||||
|
||||
self.tomo_rotate(0)
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
|
||||
|
||||
@@ -119,7 +143,7 @@ class XrayEyeAlign:
|
||||
umv(dev.fsamx, fsamx_in - 0.25)
|
||||
|
||||
self.flomni.ffzp_in()
|
||||
self.update_frame()
|
||||
self.update_frame(keep_shutter_open)
|
||||
|
||||
# enable submit buttons
|
||||
self.movement_buttons_enabled = False
|
||||
@@ -152,17 +176,18 @@ class XrayEyeAlign:
|
||||
self.flomni.feedback_disable()
|
||||
umv(dev.fsamx, fsamx_in - 0.25)
|
||||
|
||||
self.update_frame()
|
||||
epics_put("XOMNYI-XEYE-RECBG:0", 1)
|
||||
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
|
||||
time.sleep(0.5)
|
||||
print("waiting for background frame...")
|
||||
if self.labview:
|
||||
self.update_frame(keep_shutter_open)
|
||||
epics_put("XOMNYI-XEYE-RECBG:0", 1)
|
||||
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
|
||||
time.sleep(0.5)
|
||||
print("waiting for background frame...")
|
||||
|
||||
umv(dev.fsamx, fsamx_in)
|
||||
time.sleep(0.5)
|
||||
self.flomni.feedback_enable_with_reset()
|
||||
|
||||
self.update_frame()
|
||||
self.update_frame(keep_shutter_open)
|
||||
self.send_message("Adjust sample height and submit center")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
self.movement_buttons_enabled = True
|
||||
@@ -175,7 +200,7 @@ class XrayEyeAlign:
|
||||
umv(dev.rtx, 0)
|
||||
self.tomo_rotate(k * 45)
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
|
||||
self.update_frame()
|
||||
self.update_frame(keep_shutter_open)
|
||||
self.send_message("Submit sample center")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
|
||||
@@ -199,7 +224,7 @@ class XrayEyeAlign:
|
||||
if k > 0:
|
||||
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
|
||||
time.sleep(3)
|
||||
self.update_frame()
|
||||
self.update_frame(keep_shutter_open)
|
||||
|
||||
if k < 2:
|
||||
# allow movements, store movements to calculate center
|
||||
@@ -210,7 +235,7 @@ class XrayEyeAlign:
|
||||
time.sleep(2)
|
||||
epics_put("XOMNYI-XEYE-MVY:0", 0)
|
||||
self.flomni.feedback_enable_with_reset()
|
||||
self.update_frame()
|
||||
self.update_frame(keep_shutter_open)
|
||||
time.sleep(0.2)
|
||||
|
||||
self.write_output()
|
||||
@@ -221,8 +246,16 @@ class XrayEyeAlign:
|
||||
|
||||
umv(dev.rtx, 0)
|
||||
|
||||
# free camera
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 2)
|
||||
# free camera
|
||||
if self.labview:
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 2)
|
||||
if keep_shutter_open and not self.labview:
|
||||
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
|
||||
fshclose()
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
if not self.labview:
|
||||
self.flomni.flomnigui_idle()
|
||||
|
||||
|
||||
print(
|
||||
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"
|
||||
|
||||
@@ -61,25 +61,3 @@ bec._beamline_mixin._bl_info_register(OperatorInfo)
|
||||
# SETUP PROMPTS
|
||||
bec._ip.prompts.session_name = _session_name
|
||||
bec._ip.prompts.status = 1
|
||||
|
||||
# REGISTER BEAMLINE CHECKS
|
||||
from bec_lib.bl_conditions import (
|
||||
FastOrbitFeedbackCondition,
|
||||
LightAvailableCondition,
|
||||
ShutterCondition,
|
||||
)
|
||||
|
||||
if "sls_machine_status" in dev:
|
||||
print("Registering light available condition for SLS machine status")
|
||||
_light_available_condition = LightAvailableCondition(dev.sls_machine_status)
|
||||
bec.bl_checks.register(_light_available_condition)
|
||||
|
||||
if "x12sa_es1_shutter_status" in dev:
|
||||
print("Registering shutter condition for X12SA ES1 shutter status")
|
||||
_shutter_condition = ShutterCondition(dev.x12sa_es1_shutter_status)
|
||||
bec.bl_checks.register(_shutter_condition)
|
||||
|
||||
# if hasattr(dev, "sls_fast_orbit_feedback"):
|
||||
# print("Registering fast orbit feedback condition for SLS fast orbit feedback")
|
||||
# _fast_orbit_feedback_condition = FastOrbitFeedbackCondition(dev.sls_fast_orbit_feedback)
|
||||
# bec.bl_checks.register(_fast_orbit_feedback_condition)
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -14,6 +14,7 @@ logger = bec_logger.logger
|
||||
|
||||
_Widgets = {
|
||||
"OmnyAlignment": "OmnyAlignment",
|
||||
"XRayEye": "XRayEye",
|
||||
}
|
||||
|
||||
|
||||
@@ -73,3 +74,75 @@ class OmnyAlignment(RPCBase):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
|
||||
class XRayEye(RPCBase):
|
||||
@rpc_call
|
||||
def active_roi(self) -> "BaseROI | None":
|
||||
"""
|
||||
Return the currently active ROI, or None if no ROI is active.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
Get or set the live view enabled state.
|
||||
"""
|
||||
|
||||
@enable_live_view.setter
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
Get or set the live view enabled state.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def user_message(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@user_message.setter
|
||||
@rpc_call
|
||||
def user_message(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def sample_name(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@sample_name.setter
|
||||
@rpc_call
|
||||
def sample_name(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def enable_move_buttons(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@enable_move_buttons.setter
|
||||
@rpc_call
|
||||
def enable_move_buttons(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
|
||||
class XRayEye2DControl(RPCBase):
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
"""
|
||||
Cleanup the BECConnector
|
||||
"""
|
||||
|
||||
@@ -63,7 +63,7 @@ class OmnyAlignment(BECWidget, QWidget):
|
||||
|
||||
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
|
||||
|
||||
self.ui.moveUpButton.clicked.connect(self.on_move_up)
|
||||
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
|
||||
|
||||
|
||||
@property
|
||||
@@ -98,10 +98,10 @@ class OmnyAlignment(BECWidget, QWidget):
|
||||
logger.info(f"Live view is enabled: {enabled}")
|
||||
image: Image = self.ui.image
|
||||
if enabled:
|
||||
image.image("cam200")
|
||||
image.image("cam_xeye")
|
||||
return
|
||||
|
||||
image.disconnect_monitor("cam200")
|
||||
image.disconnect_monitor("cam_xeye")
|
||||
|
||||
|
||||
@property
|
||||
|
||||
@@ -86,7 +86,7 @@
|
||||
<bool>false</bool>
|
||||
</property>
|
||||
<property name="monitor" stdset="0">
|
||||
<string>cam200</string>
|
||||
<string>cam_xeye</string>
|
||||
</property>
|
||||
<property name="rotation" stdset="0">
|
||||
<number>3</number>
|
||||
|
||||
0
csaxs_bec/bec_widgets/widgets/xray_eye/__init__.py
Normal file
0
csaxs_bec/bec_widgets/widgets/xray_eye/__init__.py
Normal file
15
csaxs_bec/bec_widgets/widgets/xray_eye/register_x_ray_eye.py
Normal file
15
csaxs_bec/bec_widgets/widgets/xray_eye/register_x_ray_eye.py
Normal file
@@ -0,0 +1,15 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye_plugin import XRayEyePlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(XRayEyePlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
426
csaxs_bec/bec_widgets/widgets/xray_eye/x_ray_eye.py
Normal file
426
csaxs_bec/bec_widgets/widgets/xray_eye/x_ray_eye.py
Normal file
@@ -0,0 +1,426 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_qthemes import material_icon
|
||||
from bec_widgets import BECWidget, SafeProperty, SafeSlot
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
|
||||
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
|
||||
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
|
||||
from qtpy.QtCore import Qt, QTimer
|
||||
from qtpy.QtWidgets import (
|
||||
QFrame,
|
||||
QGridLayout,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QLineEdit,
|
||||
QPushButton,
|
||||
QSizePolicy,
|
||||
QSpinBox,
|
||||
QToolButton,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
CAMERA = ("cam_xeye", "image")
|
||||
|
||||
|
||||
class XRayEye2DControl(BECWidget, QWidget):
|
||||
def __init__(self, parent=None, step_size: int = 100, *arg, **kwargs):
|
||||
super().__init__(parent=parent, *arg, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
self._step_size = step_size
|
||||
self.root_layout = QGridLayout(self)
|
||||
self.setStyleSheet("""
|
||||
QToolButton {
|
||||
border: 1px solid;
|
||||
border-radius: 4px;
|
||||
}
|
||||
""")
|
||||
# Up
|
||||
self.move_up_button = QToolButton(parent=self)
|
||||
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
|
||||
self.root_layout.addWidget(self.move_up_button, 0, 2)
|
||||
# Up tweak button
|
||||
self.move_up_tweak_button = QToolButton(parent=self)
|
||||
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
|
||||
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
|
||||
|
||||
# Left
|
||||
self.move_left_button = QToolButton(parent=self)
|
||||
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
|
||||
self.root_layout.addWidget(self.move_left_button, 2, 0)
|
||||
# Left tweak button
|
||||
self.move_left_tweak_button = QToolButton(parent=self)
|
||||
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
|
||||
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
|
||||
|
||||
# Right
|
||||
self.move_right_button = QToolButton(parent=self)
|
||||
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
|
||||
self.root_layout.addWidget(self.move_right_button, 2, 4)
|
||||
# Right tweak button
|
||||
self.move_right_tweak_button = QToolButton(parent=self)
|
||||
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
|
||||
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
|
||||
|
||||
# Down
|
||||
self.move_down_button = QToolButton(parent=self)
|
||||
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
|
||||
self.root_layout.addWidget(self.move_down_button, 4, 2)
|
||||
# Down tweak button
|
||||
self.move_down_tweak_button = QToolButton(parent=self)
|
||||
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
|
||||
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
|
||||
|
||||
# Connections
|
||||
self.move_up_button.clicked.connect(lambda: self.move("up", tweak=False))
|
||||
self.move_up_tweak_button.clicked.connect(lambda: self.move("up", tweak=True))
|
||||
self.move_down_button.clicked.connect(lambda: self.move("down", tweak=False))
|
||||
self.move_down_tweak_button.clicked.connect(lambda: self.move("down", tweak=True))
|
||||
self.move_left_button.clicked.connect(lambda: self.move("left", tweak=False))
|
||||
self.move_left_tweak_button.clicked.connect(lambda: self.move("left", tweak=True))
|
||||
self.move_right_button.clicked.connect(lambda: self.move("right", tweak=False))
|
||||
self.move_right_tweak_button.clicked.connect(lambda: self.move("right", tweak=True))
|
||||
|
||||
@SafeProperty(int)
|
||||
def step_size(self) -> int:
|
||||
return self._step_size
|
||||
|
||||
@step_size.setter
|
||||
def step_size(self, step_size: int):
|
||||
self._step_size = step_size
|
||||
|
||||
@SafeSlot(bool)
|
||||
def enable_controls_hor(self, enable: bool):
|
||||
self.move_left_button.setEnabled(enable)
|
||||
self.move_left_tweak_button.setEnabled(enable)
|
||||
self.move_right_button.setEnabled(enable)
|
||||
self.move_right_tweak_button.setEnabled(enable)
|
||||
|
||||
@SafeSlot(bool)
|
||||
def enable_controls_ver(self, enable: bool):
|
||||
self.move_up_button.setEnabled(enable)
|
||||
self.move_up_tweak_button.setEnabled(enable)
|
||||
self.move_down_button.setEnabled(enable)
|
||||
self.move_down_tweak_button.setEnabled(enable)
|
||||
|
||||
def move(self, direction: str, tweak: bool = False):
|
||||
step = self._step_size
|
||||
if tweak:
|
||||
step = int(self._step_size / 5)
|
||||
if direction == "up":
|
||||
self.dev.omny_xray_gui.mvy.set(step)
|
||||
elif direction == "down":
|
||||
self.dev.omny_xray_gui.mvy.set(-step)
|
||||
elif direction == "left":
|
||||
self.dev.omny_xray_gui.mvx.set(-step)
|
||||
elif direction == "right":
|
||||
self.dev.omny_xray_gui.mvx.set(step)
|
||||
else:
|
||||
logger.warning(f"Unknown direction {direction} for move command.")
|
||||
|
||||
|
||||
class XRayEye(BECWidget, QWidget):
|
||||
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
|
||||
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
|
||||
PLUGIN = True
|
||||
|
||||
def __init__(self, parent=None, **kwargs):
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
|
||||
self._init_ui()
|
||||
self._make_connections()
|
||||
|
||||
# Connection to redis endpoints
|
||||
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
|
||||
self.connect_motors()
|
||||
self.resize(800, 600)
|
||||
QTimer.singleShot(0, self._init_gui_trigger)
|
||||
|
||||
def _init_ui(self):
|
||||
self.core_layout = QHBoxLayout(self)
|
||||
|
||||
self.image = Image(parent=self)
|
||||
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
|
||||
self.image.inner_axes = False # Disable inner axes to maximize image area
|
||||
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
|
||||
|
||||
# Control panel on the right: vertical layout inside a fixed-width widget
|
||||
self.control_panel = QWidget(parent=self)
|
||||
self.control_panel_layout = QVBoxLayout(self.control_panel)
|
||||
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.control_panel_layout.setSpacing(10)
|
||||
|
||||
# ROI toolbar + Live toggle (header row)
|
||||
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
|
||||
compact_orientation="horizontal")
|
||||
header_row = QHBoxLayout()
|
||||
header_row.setContentsMargins(0, 0, 0, 0)
|
||||
header_row.setSpacing(8)
|
||||
header_row.addWidget(self.roi_manager, 0)
|
||||
header_row.addStretch()
|
||||
self.live_preview_label = QLabel("Live Preview", parent=self)
|
||||
self.live_preview_toggle = ToggleSwitch(parent=self)
|
||||
self.live_preview_toggle.checked = False
|
||||
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
|
||||
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
|
||||
self.control_panel_layout.addLayout(header_row)
|
||||
|
||||
# separator
|
||||
self.control_panel_layout.addWidget(self._create_separator())
|
||||
|
||||
# 2D Positioner (fixed size)
|
||||
self.motor_control_2d = XRayEye2DControl(parent=self)
|
||||
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
|
||||
|
||||
# separator
|
||||
self.control_panel_layout.addWidget(self._create_separator())
|
||||
|
||||
# Step size label
|
||||
step_size_form = QGridLayout()
|
||||
# General Step size
|
||||
self.step_size = QSpinBox(parent=self)
|
||||
self.step_size.setRange(10, 100)
|
||||
self.step_size.setSingleStep(10)
|
||||
self.step_size.setValue(100)
|
||||
# Submit button
|
||||
self.submit_button = QPushButton("Submit", parent=self)
|
||||
# Add to layout form
|
||||
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
|
||||
step_size_form.addWidget(self.step_size, 0, 1)
|
||||
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
|
||||
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
|
||||
|
||||
# Add form to control panel
|
||||
self.control_panel_layout.addLayout(step_size_form)
|
||||
|
||||
# Push form to bottom
|
||||
self.control_panel_layout.addStretch()
|
||||
|
||||
# Sample/Message form (bottom)
|
||||
form = QGridLayout()
|
||||
self.sample_name_line_edit = QLineEdit(parent=self)
|
||||
self.sample_name_line_edit.setReadOnly(True)
|
||||
form.addWidget(QLabel("Sample", parent=self), 0, 0)
|
||||
form.addWidget(self.sample_name_line_edit, 0, 1)
|
||||
self.message_line_edit = QLineEdit(parent=self)
|
||||
self.message_line_edit.setReadOnly(True)
|
||||
form.addWidget(QLabel("Message", parent=self), 1, 0)
|
||||
form.addWidget(self.message_line_edit, 1, 1)
|
||||
self.control_panel_layout.addLayout(form)
|
||||
|
||||
# Fix panel width and allow vertical expansion
|
||||
self.control_panel.adjustSize()
|
||||
p_hint = self.control_panel.sizeHint()
|
||||
self.control_panel.setFixedWidth(p_hint.width())
|
||||
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
|
||||
|
||||
# Core Layout: image (expanding) | control panel (fixed)
|
||||
self.core_layout.addWidget(self.image)
|
||||
self.core_layout.addWidget(self.control_panel)
|
||||
|
||||
def _make_connections(self):
|
||||
# Fetch initial state
|
||||
self.on_live_view_enabled(True)
|
||||
self.step_size.setValue(self.motor_control_2d.step_size)
|
||||
|
||||
# Make connections
|
||||
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
|
||||
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
|
||||
self.submit_button.clicked.connect(self.submit)
|
||||
|
||||
def _create_separator(self):
|
||||
sep = QFrame(parent=self)
|
||||
sep.setFrameShape(QFrame.HLine)
|
||||
sep.setFrameShadow(QFrame.Sunken)
|
||||
sep.setLineWidth(1)
|
||||
return sep
|
||||
|
||||
def _init_gui_trigger(self):
|
||||
self.dev.omny_xray_gui.read()
|
||||
|
||||
################################################################################
|
||||
# Device Connection logic
|
||||
################################################################################
|
||||
|
||||
def connect_motors(self):
|
||||
""" Checks one of the possible motors for flomni, omny and lamni setup."""
|
||||
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
|
||||
|
||||
for motor in possible_motors:
|
||||
if motor in self.dev:
|
||||
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
|
||||
logger.info(f"Succesfully connected to {motor}")
|
||||
|
||||
################################################################################
|
||||
# Properties ported from the original OmnyAlignment, can be adjusted as needed
|
||||
################################################################################
|
||||
@SafeProperty(str)
|
||||
def user_message(self):
|
||||
return self.message_line_edit.text()
|
||||
|
||||
@user_message.setter
|
||||
def user_message(self, message: str):
|
||||
self.message_line_edit.setText(message)
|
||||
|
||||
@SafeProperty(str)
|
||||
def sample_name(self):
|
||||
return self.sample_name_line_edit.text()
|
||||
|
||||
@sample_name.setter
|
||||
def sample_name(self, message: str):
|
||||
self.sample_name_line_edit.setText(message)
|
||||
|
||||
@SafeProperty(bool)
|
||||
def enable_move_buttons(self):
|
||||
return self.motor_control_2d.isEnabled()
|
||||
|
||||
@enable_move_buttons.setter
|
||||
def enable_move_buttons(self, enabled: bool):
|
||||
self.motor_control_2d.setEnabled(enabled)
|
||||
|
||||
def active_roi(self) -> BaseROI | None:
|
||||
"""Return the currently active ROI, or None if no ROI is active."""
|
||||
return self.roi_manager.single_active_roi
|
||||
|
||||
################################################################################
|
||||
# Slots ported from the original OmnyAlignment, can be adjusted as needed
|
||||
################################################################################
|
||||
|
||||
@SafeSlot()
|
||||
def get_roi_coordinates(self) -> dict | None:
|
||||
"""Get the coordinates of the currently active ROI."""
|
||||
roi = self.roi_manager.single_active_roi
|
||||
if roi is None:
|
||||
logger.warning("No active ROI")
|
||||
return None
|
||||
logger.info(f"Active ROI coordinates: {roi.get_coordinates()}")
|
||||
return roi.get_coordinates()
|
||||
|
||||
@SafeSlot(bool)
|
||||
def on_live_view_enabled(self, enabled: bool):
|
||||
logger.info(f"Live view is enabled: {enabled}")
|
||||
self.live_preview_toggle.blockSignals(True)
|
||||
if enabled:
|
||||
self.live_preview_toggle.checked = enabled
|
||||
self.image.image(CAMERA)
|
||||
self.live_preview_toggle.blockSignals(False)
|
||||
return
|
||||
|
||||
self.image.disconnect_monitor(CAMERA)
|
||||
self.live_preview_toggle.checked = enabled
|
||||
self.live_preview_toggle.blockSignals(False)
|
||||
|
||||
@SafeSlot(bool, bool)
|
||||
def on_motors_enable(self, x_enable: bool, y_enable: bool):
|
||||
"""
|
||||
Enable/Disable motor controls
|
||||
|
||||
Args:
|
||||
x_enable(bool): enable x motor controls
|
||||
y_enable(bool): enable y motor controls
|
||||
"""
|
||||
self.motor_control_2d.enable_controls_hor(x_enable)
|
||||
self.motor_control_2d.enable_controls_ver(y_enable)
|
||||
|
||||
@SafeSlot(int)
|
||||
def enable_submit_button(self, enable: int):
|
||||
"""
|
||||
Enable/disable submit button.
|
||||
Args:
|
||||
enable(int): -1 disable else enable
|
||||
"""
|
||||
if enable == -1:
|
||||
self.submit_button.setEnabled(False)
|
||||
else:
|
||||
self.submit_button.setEnabled(True)
|
||||
|
||||
@SafeSlot(bool, bool)
|
||||
def on_tomo_angle_readback(self, data: dict, meta: dict):
|
||||
#TODO implement if needed
|
||||
print(f"data: {data}")
|
||||
print(f"meta: {meta}")
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def device_updates(self, data: dict, meta: dict):
|
||||
"""
|
||||
Slot to handle device updates from omny_xray_gui device.
|
||||
|
||||
Args:
|
||||
data(dict): data from device
|
||||
meta(dict): metadata from device
|
||||
"""
|
||||
|
||||
signals = data.get('signals')
|
||||
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
|
||||
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
|
||||
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
|
||||
self.on_live_view_enabled(bool(enable_live_preview))
|
||||
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
|
||||
|
||||
# Signals from epics gui device
|
||||
# send message
|
||||
user_message = signals.get("omny_xray_gui_send_message").get('value')
|
||||
self.user_message = user_message
|
||||
# sample name
|
||||
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
|
||||
self.sample_name = sample_message
|
||||
# enable frame acquisition
|
||||
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
|
||||
self.on_live_view_enabled(bool(update_frame_acq))
|
||||
# enable submit button
|
||||
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
|
||||
self.enable_submit_button(enable_submit_button)
|
||||
|
||||
@SafeSlot()
|
||||
def submit(self):
|
||||
"""Execute submit action by submit button."""
|
||||
if self.roi_manager.single_active_roi is None:
|
||||
logger.warning("No active ROI")
|
||||
return
|
||||
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
|
||||
roi_center_x = roi_coordinates['center_x']
|
||||
roi_center_y = roi_coordinates['center_y']
|
||||
# Case of rectangular ROI
|
||||
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
|
||||
roi_width = roi_coordinates['width']
|
||||
roi_height = roi_coordinates['height']
|
||||
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
|
||||
roi_width = roi_coordinates['diameter']
|
||||
roi_height = roi_coordinates['radius']
|
||||
else:
|
||||
logger.warning("Unsupported ROI type for submit action.")
|
||||
return
|
||||
|
||||
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
|
||||
# submit roi coordinates
|
||||
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
|
||||
|
||||
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
|
||||
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
|
||||
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
|
||||
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
|
||||
self.dev.omny_xray_gui.submit.set(1)
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
|
||||
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
|
||||
getattr(self.dev,CAMERA[0]).live_mode = False
|
||||
super().cleanup()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
|
||||
win = XRayEye()
|
||||
|
||||
win.resize(1000, 800)
|
||||
win.show()
|
||||
sys.exit(app.exec_())
|
||||
@@ -0,0 +1 @@
|
||||
{'files': ['x_ray_eye.py']}
|
||||
57
csaxs_bec/bec_widgets/widgets/xray_eye/x_ray_eye_plugin.py
Normal file
57
csaxs_bec/bec_widgets/widgets/xray_eye/x_ray_eye_plugin.py
Normal file
@@ -0,0 +1,57 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye import XRayEye
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='XRayEye' name='x_ray_eye'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class XRayEyePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
t = XRayEye(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(XRayEye.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "x_ray_eye"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "XRayEye"
|
||||
|
||||
def toolTip(self):
|
||||
return "XRayEye"
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -393,18 +393,35 @@ cam_flomni_overview:
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
|
||||
# cam_flomni_xeye:
|
||||
# description: Camera flOMNI Xray eye ID101
|
||||
# deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
|
||||
# deviceConfig:
|
||||
# camera_ID: 101
|
||||
# bits_per_pixel: 24
|
||||
# channels: 3
|
||||
# m_n_colormode: 1
|
||||
# enabled: true
|
||||
# onFailure: buffer
|
||||
# readOnly: false
|
||||
# readoutPriority: async
|
||||
cam_xeye:
|
||||
description: Camera flOMNI Xray eye ID1
|
||||
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
|
||||
deviceConfig:
|
||||
camera_id: 1
|
||||
bits_per_pixel: 24
|
||||
num_rotation_90: 3
|
||||
transpose: false
|
||||
force_monochrome: true
|
||||
m_n_colormode: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: async
|
||||
|
||||
cam_ids_rgb:
|
||||
description: Camera flOMNI Xray eye ID203
|
||||
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
|
||||
deviceConfig:
|
||||
camera_id: 203
|
||||
bits_per_pixel: 24
|
||||
num_rotation_90: 3
|
||||
transpose: false
|
||||
force_monochrome: true
|
||||
m_n_colormode: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: async
|
||||
|
||||
|
||||
# ############################################################
|
||||
@@ -417,4 +434,35 @@ flomni_temphum:
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
# ############################################################
|
||||
# ########## OMNY / flOMNI / LamNI fast shutter ##############
|
||||
# ############################################################
|
||||
omnyfsh:
|
||||
description: omnyfsh connects to read fast shutter at X12 if in that network
|
||||
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastEpicsShutter
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
############################################################
|
||||
#################### GUI Signals ###########################
|
||||
############################################################
|
||||
omny_xray_gui:
|
||||
description: Gui Epics signals
|
||||
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
|
||||
calculated_signal:
|
||||
description: Calculated signal from alignment for fit
|
||||
deviceClass: ophyd_devices.ComputedSignal
|
||||
deviceConfig:
|
||||
compute_method: "def just_rand():\n return 42"
|
||||
enabled: true
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
@@ -37,8 +37,7 @@ import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices import CompareStatus, TransitionStatus
|
||||
from ophyd_devices import CompareStatus, DeviceStatus, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
||||
|
||||
@@ -143,7 +143,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
|
||||
self._add_delay = add_delay or 0
|
||||
self._raise_states = raise_states or []
|
||||
super().__init__(
|
||||
device=signal,
|
||||
obj=signal,
|
||||
callback=self._compare_callback,
|
||||
timeout=timeout,
|
||||
settle_time=settle_time,
|
||||
|
||||
@@ -1 +1 @@
|
||||
from .ids_camera_new import IDSCamera
|
||||
from .ids_camera import IDSCamera
|
||||
|
||||
@@ -18,6 +18,7 @@ import atexit
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
import time
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
|
||||
@@ -66,8 +67,8 @@ class IDSCameraObject:
|
||||
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
|
||||
|
||||
if (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
logger.info("Bayer color mode detected.")
|
||||
# setup the color depth to the current windows setting
|
||||
@@ -76,16 +77,16 @@ class IDSCameraObject:
|
||||
) # TODO This raises an error - maybe check the m_n_colormode value
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
|
||||
self.n_bits_per_pixel = self.ueye.INT(32)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
@@ -159,15 +160,17 @@ class Camera:
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
camera_id: int,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: int = 24,
|
||||
connect: bool = True,
|
||||
self,
|
||||
camera_id: int,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: int = 24,
|
||||
connect: bool = True,
|
||||
force_monochrome: bool = False,
|
||||
):
|
||||
self.ueye = ueye
|
||||
self.camera_id = camera_id
|
||||
self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel}
|
||||
self.force_monochrome = force_monochrome
|
||||
self._connected = False
|
||||
self.cam = None
|
||||
atexit.register(self.on_disconnect)
|
||||
@@ -197,14 +200,16 @@ class Camera:
|
||||
self.cam = IDSCameraObject(self.camera_id, **self._inputs)
|
||||
self._connected = True
|
||||
|
||||
def on_disconnect(self):
|
||||
"""Disconnect from the camera."""
|
||||
def on_disconnect(self, delay_after: float = 0.3):
|
||||
"""Disconnect from the camera and optionally wait a short time for driver cleanup."""
|
||||
try:
|
||||
if self.cam and self.cam.h_cam:
|
||||
check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject")
|
||||
self._connected = False
|
||||
self.cam = None
|
||||
logger.info("Camera disconnected.")
|
||||
if delay_after > 0:
|
||||
time.sleep(delay_after)
|
||||
logger.debug(f"Waited {delay_after:.2f}s after camera disconnect for cleanup.")
|
||||
except Exception as e:
|
||||
logger.info(f"Error during camera disconnection: {e}")
|
||||
|
||||
@@ -263,9 +268,19 @@ class Camera:
|
||||
if array is None:
|
||||
logger.error("Failed to get image data from the camera.")
|
||||
return None
|
||||
return np.reshape(
|
||||
img = np.reshape(
|
||||
array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel)
|
||||
)
|
||||
# If RGB image (H, W, 3), reshuffle channels from BGR → RGB
|
||||
if img.ndim == 3 and img.shape[2] == 3:
|
||||
img = img[:, :, ::-1]
|
||||
if self.force_monochrome:
|
||||
gray = np.dot(img[..., :3], [0.2989, 0.5870, 0.1140]).astype(np.uint8)
|
||||
# expand to 3D shape (H, W, 1) for consistency with real mono cams
|
||||
img = np.expand_dims(gray, axis=-1)
|
||||
img = np.ascontiguousarray(img)
|
||||
|
||||
return img
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,403 +1,225 @@
|
||||
"""IDS Camera class for cSAXS IDS cameras."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Kind, Signal, StatusBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.bec_signals import PreviewSignal
|
||||
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from pydantic import ValidationInfo
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class ROISignal(Signal):
|
||||
"""
|
||||
Signal to handle the Region of Interest (ROI) for the IDS camera.
|
||||
It is a tuple of (x, y, width, height).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name,
|
||||
roi: tuple | None = None,
|
||||
value=0,
|
||||
dtype=None,
|
||||
shape=None,
|
||||
timestamp=None,
|
||||
parent=None,
|
||||
labels=None,
|
||||
kind=Kind.hinted,
|
||||
tolerance=None,
|
||||
rtolerance=None,
|
||||
metadata=None,
|
||||
cl=None,
|
||||
attr_name="",
|
||||
):
|
||||
super().__init__(
|
||||
name=name,
|
||||
value=value,
|
||||
dtype=dtype,
|
||||
shape=shape,
|
||||
timestamp=timestamp,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
kind=kind,
|
||||
tolerance=tolerance,
|
||||
rtolerance=rtolerance,
|
||||
metadata=metadata,
|
||||
cl=cl,
|
||||
attr_name=attr_name,
|
||||
)
|
||||
self.roi = roi
|
||||
|
||||
def get(self, **kwargs):
|
||||
image = self.parent.image_data.get().data
|
||||
if not isinstance(image, np.ndarray):
|
||||
return -1 # -1 if no valid image is available
|
||||
|
||||
if self.roi is None:
|
||||
roi = (0, 0, image.shape[1], image.shape[0])
|
||||
else:
|
||||
roi = self.roi
|
||||
if len(image.shape) > 2:
|
||||
image = np.sum(image, axis=2) # Convert to grayscale if it's a color image
|
||||
return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1))
|
||||
|
||||
|
||||
class IDSCamera(PSIDeviceBase):
|
||||
""" "
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
"""IDS Camera class for cSAXS.
|
||||
|
||||
#Variables
|
||||
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
|
||||
sInfo = ueye.SENSORINFO()
|
||||
cInfo = ueye.CAMINFO()
|
||||
pcImageMemory = ueye.c_mem_p()
|
||||
MemID = ueye.int()
|
||||
rectAOI = ueye.IS_RECT()
|
||||
pitch = ueye.INT()
|
||||
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
|
||||
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
|
||||
ids_cam
|
||||
...
|
||||
This class inherits from PSIDeviceBase and implements the necessary methods
|
||||
to interact with the IDS camera using the pyueye library.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"]
|
||||
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0,
|
||||
transpose=False)
|
||||
roi_signal = Cpt(
|
||||
AsyncSignal,
|
||||
name="roi_signal",
|
||||
ndim=0,
|
||||
max_size=1000,
|
||||
doc="Signal for the region of interest (ROI).",
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
)
|
||||
|
||||
image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted)
|
||||
# roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal)
|
||||
# roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal)
|
||||
# roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal)
|
||||
# roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal)
|
||||
# roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal")
|
||||
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name: str,
|
||||
camera_ID: int,
|
||||
bits_per_pixel: int,
|
||||
channels: int,
|
||||
m_n_colormode: int,
|
||||
kind=None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
camera_id: int,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: Literal[8, 24] = 24,
|
||||
live_mode: bool = False,
|
||||
num_rotation_90: int = 0,
|
||||
transpose: bool = False,
|
||||
force_monochrome: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the IDS Camera.
|
||||
|
||||
super().__init__(
|
||||
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
|
||||
)
|
||||
self.camera_ID = camera_ID
|
||||
self.bits_per_pixel = bits_per_pixel
|
||||
self.bytes_per_pixel = None
|
||||
self.channels = channels
|
||||
self._m_n_colormode_input = m_n_colormode
|
||||
self.m_n_colormode = None
|
||||
self.ueye = ueye
|
||||
self.h_cam = None
|
||||
self.s_info = None
|
||||
self.data_thread = None
|
||||
self.c_info = None
|
||||
self.pc_image_memory = None
|
||||
self.mem_id = None
|
||||
self.rect_aoi = None
|
||||
self.pitch = None
|
||||
self.n_bits_per_pixel = None
|
||||
self.width = None
|
||||
self.height = None
|
||||
self.thread_event = threading.Event()
|
||||
self.data_thread = None
|
||||
self._roi: tuple | None = None # x, y, width, height
|
||||
logger.info(
|
||||
f"Deprecation warning: The IDSCamera class is deprecated. Use the new IDSCameraNew class instead."
|
||||
Args:
|
||||
name (str): Name of the device.
|
||||
camera_id (int): The ID of the camera device.
|
||||
prefix (str): Prefix for the device.
|
||||
scan_info (ScanInfo | None): Scan information for the device.
|
||||
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
|
||||
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
|
||||
live_mode (bool): Whether to enable live mode for the camera.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._live_mode_thread: threading.Thread | None = None
|
||||
self._stop_live_mode_event: threading.Event = threading.Event()
|
||||
self.cam = Camera(
|
||||
camera_id=camera_id,
|
||||
m_n_colormode=m_n_colormode,
|
||||
bits_per_pixel=bits_per_pixel,
|
||||
connect=False,
|
||||
)
|
||||
self._live_mode = False
|
||||
self._inputs = {"live_mode": live_mode}
|
||||
self._mask = np.zeros((1, 1), dtype=np.uint8)
|
||||
self.image.num_rotation_90 = num_rotation_90
|
||||
self.image.transpose = transpose
|
||||
self._force_monochrome = force_monochrome
|
||||
|
||||
def set_roi(self, x: int, y: int, width: int, height: int):
|
||||
self._roi = (x, y, width, height)
|
||||
############## Live Mode Methods ##############
|
||||
|
||||
def start_backend(self):
|
||||
if self.ueye is None:
|
||||
raise ImportError("The pyueye library is not installed.")
|
||||
self.h_cam = self.ueye.HIDS(
|
||||
self.camera_ID
|
||||
) # 0: first available camera; 1-254: The camera with the specified camera ID
|
||||
self.s_info = self.ueye.SENSORINFO()
|
||||
self.c_info = self.ueye.CAMINFO()
|
||||
self.pc_image_memory = self.ueye.c_mem_p()
|
||||
self.mem_id = self.ueye.int()
|
||||
self.rect_aoi = self.ueye.IS_RECT()
|
||||
self.pitch = self.ueye.INT()
|
||||
self.n_bits_per_pixel = self.ueye.INT(
|
||||
self.bits_per_pixel
|
||||
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
self.m_n_colormode = self.ueye.INT(
|
||||
self._m_n_colormode_input
|
||||
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
@property
|
||||
def mask(self) -> np.ndarray:
|
||||
"""Return the current region of interest (ROI) for the camera."""
|
||||
return self._mask
|
||||
|
||||
# Starts the driver and establishes the connection to the camera
|
||||
ret = self.ueye.is_InitCamera(self.h_cam, None)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_InitCamera ERROR")
|
||||
@mask.setter
|
||||
def mask(self, value: np.ndarray):
|
||||
"""
|
||||
Set the region of interest (ROI) for the camera.
|
||||
|
||||
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
|
||||
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_GetCameraInfo ERROR")
|
||||
Args:
|
||||
value (np.ndarray): The mask to set as the ROI.
|
||||
"""
|
||||
if value.ndim != 2:
|
||||
raise ValueError("ROI mask must be a 2D array.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
|
||||
raise ValueError(
|
||||
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
|
||||
)
|
||||
self._mask = value
|
||||
|
||||
# You can query additional information about the sensor type used in the camera
|
||||
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_GetSensorInfo ERROR")
|
||||
@property
|
||||
def live_mode(self) -> bool:
|
||||
"""Return whether the camera is in live mode."""
|
||||
return self._live_mode
|
||||
|
||||
ret = self.ueye.is_ResetToDefault(self.h_cam)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_ResetToDefault ERROR")
|
||||
|
||||
# Set display mode to DIB
|
||||
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
|
||||
|
||||
# Set the right color mode
|
||||
if (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
# setup the color depth to the current windows setting
|
||||
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
|
||||
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_BAYER: ")
|
||||
print("\tm_n_colormode: \t\t", self.m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
|
||||
n_bits_per_pixel = self.ueye.INT(32)
|
||||
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_CBYCRY: ")
|
||||
print("\tm_n_colormode: \t\t", m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
n_bits_per_pixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_MONOCHROME: ")
|
||||
print("\tm_n_colormode: \t\t", m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
else:
|
||||
# for monochrome camera models use Y8 mode
|
||||
m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
n_bits_per_pixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(n_bits_per_pixel / 8)
|
||||
print("else")
|
||||
|
||||
# Can be used to set the size and position of an "area of interest"(AOI) within an image
|
||||
ret = self.ueye.is_AOI(
|
||||
self.h_cam,
|
||||
self.ueye.IS_AOI_IMAGE_GET_AOI,
|
||||
self.rect_aoi,
|
||||
self.ueye.sizeof(self.rect_aoi),
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_AOI ERROR")
|
||||
|
||||
self.width = self.rect_aoi.s32Width
|
||||
self.height = self.rect_aoi.s32Height
|
||||
|
||||
# Prints out some information about the camera and the sensor
|
||||
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
|
||||
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
|
||||
print("Maximum image width:\t", self.width)
|
||||
print("Maximum image height:\t", self.height)
|
||||
print()
|
||||
|
||||
# ---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
|
||||
ret = self.ueye.is_AllocImageMem(
|
||||
self.h_cam,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pc_image_memory,
|
||||
self.mem_id,
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_AllocImageMem ERROR")
|
||||
else:
|
||||
# Makes the specified image memory the active memory
|
||||
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_SetImageMem ERROR")
|
||||
@live_mode.setter
|
||||
def live_mode(self, value: bool):
|
||||
"""Set the live mode for the camera."""
|
||||
if value != self._live_mode:
|
||||
if self.cam._connected is False: # $ pylint: disable=protected-access
|
||||
self.cam.on_connect()
|
||||
self._live_mode = value
|
||||
if value:
|
||||
self._start_live()
|
||||
else:
|
||||
# Set the desired color mode
|
||||
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
|
||||
self._stop_live()
|
||||
|
||||
# Activates the camera's live video mode (free run mode)
|
||||
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_CaptureVideo ERROR")
|
||||
def set_rect_roi(self, x: int, y: int, width: int, height: int):
|
||||
"""Set the rectangular region of interest (ROI) for the camera."""
|
||||
if x < 0 or y < 0 or width <= 0 or height <= 0:
|
||||
raise ValueError("ROI coordinates and dimensions must be positive integers.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if x + width > img_shape[1] or y + height > img_shape[0]:
|
||||
raise ValueError("ROI exceeds camera dimensions.")
|
||||
mask = np.zeros(img_shape, dtype=np.uint8)
|
||||
mask[y: y + height, x: x + width] = 1
|
||||
self.mask = mask
|
||||
|
||||
# Enables the queue mode for existing image memory sequences
|
||||
ret = self.ueye.is_InquireImageMem(
|
||||
self.h_cam,
|
||||
self.pc_image_memory,
|
||||
self.mem_id,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pitch,
|
||||
def _start_live(self):
|
||||
"""Start the live mode for the camera."""
|
||||
if self._live_mode_thread is not None:
|
||||
logger.info("Live mode thread is already running.")
|
||||
return
|
||||
self._stop_live_mode_event.clear()
|
||||
self._live_mode_thread = threading.Thread(
|
||||
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_InquireImageMem ERROR")
|
||||
self._live_mode_thread.start()
|
||||
|
||||
def _stop_live(self):
|
||||
"""Stop the live mode for the camera."""
|
||||
if self._live_mode_thread is None:
|
||||
logger.info("Live mode thread is not running.")
|
||||
return
|
||||
self._stop_live_mode_event.set()
|
||||
self._live_mode_thread.join(timeout=5)
|
||||
if self._live_mode_thread.is_alive():
|
||||
logger.warning("Live mode thread did not stop gracefully.")
|
||||
else:
|
||||
print("Press q to leave the programm")
|
||||
# startmeasureframerate = True
|
||||
# Gain = False
|
||||
self._live_mode_thread = None
|
||||
logger.info("Live mode stopped.")
|
||||
|
||||
# Start live mode of camera immediately
|
||||
self.start_live_mode()
|
||||
def _live_mode_loop(self, stop_event: threading.Event):
|
||||
"""Loop to capture images in live mode."""
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
self.process_data(self.cam.get_image_data())
|
||||
except Exception as e:
|
||||
logger.error(f"Error in live mode loop: {e}")
|
||||
break
|
||||
stop_event.wait(0.2) # 5 Hz
|
||||
|
||||
def _start_data_thread(self):
|
||||
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
|
||||
self.data_thread.start()
|
||||
def process_data(self, image: np.ndarray | None):
|
||||
"""Process the image data before sending it to the preview signal."""
|
||||
if image is None:
|
||||
return
|
||||
self.image.put(image)
|
||||
|
||||
def _receive_data_from_camera(self):
|
||||
while not self.thread_event.is_set():
|
||||
if self.ueye is None:
|
||||
print("pyueye library not available.")
|
||||
def get_last_image(self) -> np.ndarray:
|
||||
"""Get the last captured image from the camera."""
|
||||
image = self.image.get()
|
||||
if image:
|
||||
return image.data
|
||||
|
||||
############## User Interface Methods ##############
|
||||
|
||||
def on_connected(self):
|
||||
"""Connect to the camera."""
|
||||
self.cam.force_monochrome = self._force_monochrome
|
||||
self.cam.on_connect()
|
||||
self.live_mode = self._inputs.get("live_mode", False)
|
||||
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
|
||||
|
||||
def on_destroy(self):
|
||||
"""Clean up resources when the device is destroyed."""
|
||||
self.cam.on_disconnect()
|
||||
super().on_destroy()
|
||||
|
||||
def on_trigger(self):
|
||||
"""Handle the trigger event."""
|
||||
if not self.live_mode:
|
||||
return
|
||||
image = self.image.get()
|
||||
if image is not None:
|
||||
image: messages.DevicePreviewMessage
|
||||
if self.mask.shape[0:2] != image.data.shape[0:2]:
|
||||
logger.info(
|
||||
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
|
||||
)
|
||||
return
|
||||
# In order to display the image in an OpenCV window we need to...
|
||||
# ...extract the data of our image memory
|
||||
array = self.ueye.get_data(
|
||||
self.pc_image_memory,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pitch,
|
||||
copy=False,
|
||||
)
|
||||
|
||||
# ...reshape it in an numpy array...
|
||||
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
|
||||
self.image_data.put(frame)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
def wait_for_connection(self, all_signals=False, timeout=10):
|
||||
if ueye is None:
|
||||
raise ImportError(
|
||||
"The pyueye library is not installed or doesn't provide the necessary c libs"
|
||||
)
|
||||
super().wait_for_connection(all_signals, timeout)
|
||||
|
||||
def start_live_mode(self):
|
||||
if self.data_thread is not None:
|
||||
self.stop_live_mode()
|
||||
self._start_data_thread()
|
||||
|
||||
def stop_live_mode(self):
|
||||
"""Stopping the camera live mode."""
|
||||
self.thread_event.set()
|
||||
if self.data_thread is not None:
|
||||
self.data_thread.join()
|
||||
self.thread_event.clear()
|
||||
self.data_thread = None
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
self.start_backend()
|
||||
self.start_live_mode()
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
|
||||
def on_destroy(self) -> None:
|
||||
"""Called when the device is destroyed. Cleanup resources here."""
|
||||
self.stop_live_mode()
|
||||
if len(image.data.shape) == 3:
|
||||
# If the image has multiple channels, apply the mask to each channel
|
||||
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
|
||||
n_channels = 3
|
||||
else:
|
||||
data = image.data * self.mask
|
||||
n_channels = 1
|
||||
self.roi_signal.put(np.sum(data) / (np.sum(self.mask) * n_channels))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1)
|
||||
camera.wait_for_connection()
|
||||
|
||||
camera.on_destroy()
|
||||
# Example usage of the IDSCamera class
|
||||
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
|
||||
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")
|
||||
|
||||
@@ -1,226 +0,0 @@
|
||||
"""IDS Camera class for cSAXS IDS cameras."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from pydantic import ValidationInfo
|
||||
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class IDSCamera(PSIDeviceBase):
|
||||
"""IDS Camera class for cSAXS.
|
||||
|
||||
This class inherits from PSIDeviceBase and implements the necessary methods
|
||||
to interact with the IDS camera using the pyueye library.
|
||||
"""
|
||||
|
||||
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.")
|
||||
roi_signal = Cpt(
|
||||
AsyncSignal,
|
||||
name="roi_signal",
|
||||
ndim=0,
|
||||
max_size=1000,
|
||||
doc="Signal for the region of interest (ROI).",
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
)
|
||||
|
||||
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
camera_id: int,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: Literal[8, 24] = 24,
|
||||
live_mode: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the IDS Camera.
|
||||
|
||||
Args:
|
||||
name (str): Name of the device.
|
||||
camera_id (int): The ID of the camera device.
|
||||
prefix (str): Prefix for the device.
|
||||
scan_info (ScanInfo | None): Scan information for the device.
|
||||
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
|
||||
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
|
||||
live_mode (bool): Whether to enable live mode for the camera.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._live_mode_thread: threading.Thread | None = None
|
||||
self._stop_live_mode_event: threading.Event = threading.Event()
|
||||
self.cam = Camera(
|
||||
camera_id=camera_id,
|
||||
m_n_colormode=m_n_colormode,
|
||||
bits_per_pixel=bits_per_pixel,
|
||||
connect=False,
|
||||
)
|
||||
self._live_mode = False
|
||||
self._inputs = {"live_mode": live_mode}
|
||||
self._mask = np.zeros((1, 1), dtype=np.uint8)
|
||||
|
||||
############## Live Mode Methods ##############
|
||||
|
||||
@property
|
||||
def mask(self) -> np.ndarray:
|
||||
"""Return the current region of interest (ROI) for the camera."""
|
||||
return self._mask
|
||||
|
||||
@mask.setter
|
||||
def mask(self, value: np.ndarray):
|
||||
"""
|
||||
Set the region of interest (ROI) for the camera.
|
||||
|
||||
Args:
|
||||
value (np.ndarray): The mask to set as the ROI.
|
||||
"""
|
||||
if value.ndim != 2:
|
||||
raise ValueError("ROI mask must be a 2D array.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
|
||||
raise ValueError(
|
||||
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
|
||||
)
|
||||
self._mask = value
|
||||
|
||||
@property
|
||||
def live_mode(self) -> bool:
|
||||
"""Return whether the camera is in live mode."""
|
||||
return self._live_mode
|
||||
|
||||
@live_mode.setter
|
||||
def live_mode(self, value: bool):
|
||||
"""Set the live mode for the camera."""
|
||||
if value != self._live_mode:
|
||||
if self.cam._connected is False: # $ pylint: disable=protected-access
|
||||
self.cam.on_connect()
|
||||
self._live_mode = value
|
||||
if value:
|
||||
self._start_live()
|
||||
else:
|
||||
self._stop_live()
|
||||
|
||||
def set_rect_roi(self, x: int, y: int, width: int, height: int):
|
||||
"""Set the rectangular region of interest (ROI) for the camera."""
|
||||
if x < 0 or y < 0 or width <= 0 or height <= 0:
|
||||
raise ValueError("ROI coordinates and dimensions must be positive integers.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if x + width > img_shape[1] or y + height > img_shape[0]:
|
||||
raise ValueError("ROI exceeds camera dimensions.")
|
||||
mask = np.zeros(img_shape, dtype=np.uint8)
|
||||
mask[y : y + height, x : x + width] = 1
|
||||
self.mask = mask
|
||||
|
||||
def _start_live(self):
|
||||
"""Start the live mode for the camera."""
|
||||
if self._live_mode_thread is not None:
|
||||
logger.info("Live mode thread is already running.")
|
||||
return
|
||||
self._stop_live_mode_event.clear()
|
||||
self._live_mode_thread = threading.Thread(
|
||||
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
|
||||
)
|
||||
self._live_mode_thread.start()
|
||||
|
||||
def _stop_live(self):
|
||||
"""Stop the live mode for the camera."""
|
||||
if self._live_mode_thread is None:
|
||||
logger.info("Live mode thread is not running.")
|
||||
return
|
||||
self._stop_live_mode_event.set()
|
||||
self._live_mode_thread.join(timeout=5)
|
||||
if self._live_mode_thread.is_alive():
|
||||
logger.warning("Live mode thread did not stop gracefully.")
|
||||
else:
|
||||
self._live_mode_thread = None
|
||||
logger.info("Live mode stopped.")
|
||||
|
||||
def _live_mode_loop(self, stop_event: threading.Event):
|
||||
"""Loop to capture images in live mode."""
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
self.process_data(self.cam.get_image_data())
|
||||
except Exception as e:
|
||||
logger.error(f"Error in live mode loop: {e}")
|
||||
break
|
||||
stop_event.wait(0.2) # 5 Hz
|
||||
|
||||
def process_data(self, image: np.ndarray | None):
|
||||
"""Process the image data before sending it to the preview signal."""
|
||||
if image is None:
|
||||
return
|
||||
self.image.put(image)
|
||||
|
||||
def get_last_image(self) -> np.ndarray:
|
||||
"""Get the last captured image from the camera."""
|
||||
image = self.image.get()
|
||||
if image:
|
||||
return image.data
|
||||
|
||||
############## User Interface Methods ##############
|
||||
|
||||
def on_connected(self):
|
||||
"""Connect to the camera."""
|
||||
self.cam.on_connect()
|
||||
self.live_mode = self._inputs.get("live_mode", False)
|
||||
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
|
||||
|
||||
def on_destroy(self):
|
||||
"""Clean up resources when the device is destroyed."""
|
||||
self.cam.on_disconnect()
|
||||
super().on_destroy()
|
||||
|
||||
def on_trigger(self):
|
||||
"""Handle the trigger event."""
|
||||
if not self.live_mode:
|
||||
return
|
||||
image = self.image.get()
|
||||
if image is not None:
|
||||
image: messages.DevicePreviewMessage
|
||||
if self.mask.shape[0:2] != image.data.shape[0:2]:
|
||||
logger.info(
|
||||
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
|
||||
)
|
||||
return
|
||||
|
||||
if len(image.data.shape) == 3:
|
||||
# If the image has multiple channels, apply the mask to each channel
|
||||
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
|
||||
n_channels = 3
|
||||
else:
|
||||
data = image.data * self.mask
|
||||
n_channels = 1
|
||||
self.roi_signal.put(
|
||||
{
|
||||
self.roi_signal.name: {
|
||||
"value": np.sum(data)
|
||||
/ (np.sum(self.mask) * n_channels), # TODO could be optimized
|
||||
"timestamp": time.time(),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage of the IDSCamera class
|
||||
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
|
||||
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")
|
||||
82
csaxs_bec/devices/omny/shutter.py
Normal file
82
csaxs_bec/devices/omny/shutter.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import time
|
||||
import socket
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device
|
||||
from ophyd import EpicsSignal
|
||||
|
||||
|
||||
class OMNYFastEpicsShutterError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _detect_host_pv():
|
||||
"""Detect host subnet and return appropriate PV name."""
|
||||
try:
|
||||
hostname = socket.gethostname()
|
||||
local_ip = socket.gethostbyname(hostname)
|
||||
if local_ip.startswith("129.129.122."):
|
||||
return "X12SA-ES1-TTL:OUT_01"
|
||||
else:
|
||||
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
|
||||
except Exception as ex:
|
||||
print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.")
|
||||
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
|
||||
|
||||
|
||||
class OMNYFastEpicsShutter(Device):
|
||||
"""
|
||||
Fast EPICS shutter with automatic PV selection based on host subnet.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"]
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
# PV is detected dynamically at import time
|
||||
shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True)
|
||||
|
||||
def __init__(self, prefix="", *, name, **kwargs):
|
||||
super().__init__(prefix, name=name, **kwargs)
|
||||
self.shutter.subscribe(self._emit_value)
|
||||
|
||||
def _emit_value(self, **kwargs):
|
||||
timestamp = kwargs.pop("timestamp", time.time())
|
||||
self.wait_for_connection()
|
||||
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# User-facing shutter control functions
|
||||
# -----------------------------------------------------
|
||||
|
||||
def fshopen(self):
|
||||
"""Open the fast shutter."""
|
||||
try:
|
||||
self.shutter.put(1, wait=True)
|
||||
except Exception as ex:
|
||||
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
|
||||
|
||||
def fshclose(self):
|
||||
"""Close the fast shutter."""
|
||||
try:
|
||||
self.shutter.put(0, wait=True)
|
||||
except Exception as ex:
|
||||
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
|
||||
|
||||
def fshstatus(self):
|
||||
"""Return the fast shutter status (0=closed, 1=open)."""
|
||||
try:
|
||||
return self.shutter.get()
|
||||
except Exception as ex:
|
||||
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
|
||||
|
||||
def fshinfo(self):
|
||||
"""Print information about which EPICS PV channel is being used."""
|
||||
pvname = self.shutter.pvname
|
||||
print(f"Fast shutter connected to EPICS channel: {pvname}")
|
||||
return pvname
|
||||
|
||||
def help(self):
|
||||
"""Display available user methods."""
|
||||
print("Available methods:")
|
||||
for method in self.USER_ACCESS:
|
||||
print(f" - {method}")
|
||||
74
csaxs_bec/devices/omny/xray_epics_gui.py
Normal file
74
csaxs_bec/devices/omny/xray_epics_gui.py
Normal file
@@ -0,0 +1,74 @@
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device
|
||||
from ophyd import DynamicDeviceComponent as Dcpt
|
||||
from ophyd import EpicsSignal
|
||||
|
||||
|
||||
|
||||
class OMNYXRayEpicsGUI(Device):
|
||||
|
||||
save_frame = Cpt(
|
||||
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
|
||||
)
|
||||
update_frame_acqdone = Cpt(
|
||||
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
|
||||
)
|
||||
update_frame_acq = Cpt(
|
||||
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
|
||||
)
|
||||
width_y_dynamic = {
|
||||
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
width_y = Dcpt(width_y_dynamic)
|
||||
width_x_dynamic = {
|
||||
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
width_x = Dcpt(width_x_dynamic)
|
||||
enable_mv_x = Cpt(
|
||||
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
|
||||
)
|
||||
enable_mv_y = Cpt(
|
||||
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
|
||||
)
|
||||
send_message = Cpt(
|
||||
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
|
||||
)
|
||||
sample_name = Cpt(
|
||||
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
|
||||
)
|
||||
angle = Cpt(
|
||||
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
|
||||
)
|
||||
pixel_size = Cpt(
|
||||
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
|
||||
)
|
||||
submit = Cpt(
|
||||
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
|
||||
)
|
||||
step = Cpt(
|
||||
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
|
||||
)
|
||||
xval_x_dynamic = {
|
||||
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
xval_x = Dcpt(xval_x_dynamic)
|
||||
yval_y_dynamic = {
|
||||
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
yval_y = Dcpt(yval_y_dynamic)
|
||||
recbg = Cpt(
|
||||
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
|
||||
)
|
||||
stage_pos_x_dynamic = {
|
||||
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
|
||||
}
|
||||
stage_pos_x = Dcpt(stage_pos_x_dynamic)
|
||||
mvx = Cpt(
|
||||
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
|
||||
)
|
||||
mvy = Cpt(
|
||||
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
|
||||
)
|
||||
|
||||
|
||||
@@ -88,6 +88,14 @@ Depending on the tomo mode following parameters can be given to the `flomni.tomo
|
||||
| Golden ratio tomography (sorted in bunches) | projection_number=None |
|
||||
| Equally spaced with golden starting angle | projection_number=None |
|
||||
|
||||
### GUI tools
|
||||
|
||||
During operation the BEC GUI will show the relevant cameras or progress information. To manually switch view TAB completion on 'flomni.flomnigui_' will show all options to control the GUI. Most useful
|
||||
'flomni.flomnigui_show_cameras()' will show the cameras for sample transfer and interior overview
|
||||
'flomni.flomnigui_show_progress()' will show the measurement progress GUI
|
||||
'flomnigui_show_xeyealign()' will show the XrayEye alignment GUI
|
||||
|
||||
|
||||
## How to setup flOMNI (software)
|
||||
|
||||
This part of the manual is intended for beamline staff and expert users
|
||||
@@ -218,9 +226,15 @@ Update the values by, example for feyex and in position,
|
||||
To refresh the frame of the xray eye windows software
|
||||
`flomni.xrayeye_update_frame()`
|
||||
|
||||
This command can also be called to keep the shutter open and live view active
|
||||
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
|
||||
|
||||
To start the xray eye alignment (and clear any previous alignment)
|
||||
`flomni.xrayeye_alignment_start()`
|
||||
|
||||
This command can also be called to keep the shutter open and live view active. Warning: The dose to the sample will be significantly higher.
|
||||
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
|
||||
|
||||
To load the fit parameters from directory _dir_path_ computed by _SPEC_ptycho_align.m_ in Matlab run
|
||||
`flomni.read_alignment_offset(dir_path='')`
|
||||
The loading routine uses default values for the vertical alignment. This behavior can be changed (e.g. for getting new default values) by the parameter `use_vertical_default_values=False`.
|
||||
|
||||
@@ -16,7 +16,7 @@ dependencies = [
|
||||
"bec_ipython_client",
|
||||
"bec_lib",
|
||||
"bec_server",
|
||||
"ophyd_devices",
|
||||
"ophyd_devices~=1.29",
|
||||
"std_daq_client",
|
||||
"jfjoch-client",
|
||||
"rich",
|
||||
|
||||
@@ -5,7 +5,7 @@ from unittest import mock
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.ids_camera_new import IDSCamera
|
||||
from csaxs_bec.devices.ids_cameras.ids_camera import IDSCamera
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
||||
Reference in New Issue
Block a user