Compare commits

..

3 Commits

Author SHA1 Message Date
x12sa
79d2071094 correct epics channel for user motor and add direction_of_travel parameter in device config file
Some checks failed
CI for csaxs_bec / test (push) Failing after 34s
2025-10-01 16:01:14 +02:00
x12sa
d8d2417679 add motors_copy_paste.yaml for user motors in experimental hutch
Some checks failed
CI for csaxs_bec / test (push) Failing after 31s
2025-09-30 17:07:40 +02:00
x12sa
8aa0a52bda adding energy motor for double nultilayer mono, for the moment just commented
Some checks failed
CI for csaxs_bec / test (push) Failing after 40s
2025-09-22 16:26:56 +02:00
34 changed files with 818 additions and 1887 deletions

View File

@@ -47,13 +47,25 @@ jobs:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
- name: Checkout BEC Core
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
uses: actions/checkout@v4
with:
repository: bec/bec
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
path: ./bec
- name: Checkout Ophyd Devices
run: git clone --depth 1 --branch "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}" https://github.com/bec-project/ophyd_devices.git ./ophyd_devices
uses: actions/checkout@v4
with:
repository: bec/ophyd_devices
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
path: ./ophyd_devices
- name: Checkout BEC Widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
uses: actions/checkout@v4
with:
repository: bec/bec_widgets
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4

View File

@@ -14,8 +14,6 @@ from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
logger = bec_logger.logger
@@ -26,57 +24,27 @@ if builtins.__dict__.get("bec") is not None:
umvr = builtins.__dict__.get("umvr")
class FlomniToolsError(Exception):
pass
class FlomniInitError(Exception):
pass
class FlomniError(Exception):
pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
class FlomniInitStagesMixin:
def flomni_init_stages(self):
if self.OMNYTools.yesno("Starting initialization of flOMNI stages. OK?"):
user_input = input("Starting initialization of flOMNI stages. OK? [y/n]")
if user_input == "y":
print("staring...")
else:
return
if self.check_all_axes_of_fomni_referenced():
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
print("ok then...")
else:
return
@@ -106,8 +74,10 @@ class FlomniInitStagesMixin:
dev.feyex.limits = [-30, -1]
print("done")
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"):
user_input = input(
"Init of foptz. Can the stage move to the upstream limit without collision? [y/n]"
)
if user_input == "y":
print("good then")
else:
return
@@ -161,7 +131,10 @@ class FlomniInitStagesMixin:
dev.fsamy.limits = [2, 3.1]
print("done")
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"):
user_input = input(
"Init of tracking stages. Did you remove the outer laser flight tubes? [y/n]"
)
if user_input == "y":
print("good then")
else:
print("Stopping.")
@@ -177,7 +150,8 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5]
print("done")
if self.OMNYTools.yesno("Init of sample stage. Is the piezo at about 0 deg?"):
user_input = input("Init of sample stage. Is the piezo at about 0 deg? [y/n]")
if user_input == "y":
print("good then")
else:
print("Stopping.")
@@ -194,7 +168,11 @@ class FlomniInitStagesMixin:
print("done")
print("Initializing UPR stage.")
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"):
user_input = input(
"To ensure that the end switches work, please check that they are currently not pushed."
" Is everything okay? [y/n]"
)
if user_input == "y":
print("good then")
else:
print("Stopping.")
@@ -215,7 +193,8 @@ class FlomniInitStagesMixin:
time.sleep(1)
continue
break
if self.OMNYTools.yesno("Shall I start the index search?"):
user_input = input("Shall I start the index search? [y/n]")
if user_input == "y":
print("good then. Starting index search.")
else:
print("Stopping.")
@@ -234,7 +213,11 @@ class FlomniInitStagesMixin:
dev.fsamroy.limits = [-5, 365]
print("done")
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"):
user_input = input(
"Init of foptx. Can the stage move to the positive limit without collision? Attention:"
" tracker flight tube! [y/n]"
)
if user_input == "y":
print("good then")
else:
print("Stopping.")
@@ -258,7 +241,8 @@ class FlomniInitStagesMixin:
continue
break
if self.OMNYTools.yesno("Start limit switch search of fopty?"):
user_input = input("Start limit switch search of fopty? [y/n]")
if user_input == "y":
print("good then")
else:
print("Stopping.")
@@ -291,7 +275,8 @@ class FlomniInitStagesMixin:
return False
def set_limits(self):
if self.OMNYTools.yesno("Set default limits for flOMNI?"):
user_input = input("Set default limits for flOMNI? [y/n]")
if user_input == "y":
print("setting limits...")
else:
print("Stopping.")
@@ -318,7 +303,8 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5]
def _align_setup(self):
if self.OMNYTools.yesno("Start moving stages to default initial positions?", "y"):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
print("Start moving stages...")
else:
print("Stopping.")
@@ -411,8 +397,7 @@ class FlomniSampleTransferMixin:
raise FlomniError("Ftray is not at the 'IN' position. Aborting.")
def ftransfer_flomni_stage_in(self):
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
sample_in_position = bool(float(dev.flomni_samples.sample_placed.sample0.get()))
if not sample_in_position:
raise FlomniError("There is no sample in the sample stage. Aborting.")
self.reset_correction()
@@ -425,8 +410,6 @@ class FlomniSampleTransferMixin:
umv(dev.fsamx, fsamx_in)
dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4]
self.flomnigui_idle()
def laser_tracker_show_all(self):
dev.rtx.controller.laser_tracker_show_all()
@@ -466,10 +449,6 @@ class FlomniSampleTransferMixin:
self.device_manager.devices.fsamx.controller.lights_on()
def ftransfer_flomni_stage_out(self):
self.flomnigui_show_cameras()
self.flomnigui_raise()
target_pos = -162
if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01):
return
@@ -517,20 +496,22 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
if sample_in_gripper:
raise FlomniError(
"The gripper does carry a sample. Cannot proceed getting another sample."
)
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
if not sample_in_position:
raise FlomniError(f"The planned pick position [{position}] does not have a sample.")
self.flomnigui_show_cameras()
if self.OMNYTools.yesno("Please confirm that there is currently no sample in the gripper. It would be dropped!", "y"):
user_input = input(
"Please confirm that there is currently no sample in the gripper. It would be dropped!"
" [y/n]"
)
if user_input == "y":
print("good then")
else:
print("Stopping.")
@@ -574,12 +555,12 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
if not sample_in_gripper:
raise FlomniError("The gripper does not carry a sample.")
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
if sample_in_position:
raise FlomniError(f"The planned put position [{position}] already has a sample.")
@@ -612,9 +593,8 @@ class FlomniSampleTransferMixin:
self.flomni_modify_storage_non_interactive(100, 0, "-")
self.flomni_modify_storage_non_interactive(position, 1, sample_name)
if position == 0:
self.ftransfer_flomni_stage_in()
bec.queue.next_dataset_number += 1
# TODO: flomni_stage_in if position == 0
# bec.queue.next_dataset_number += 1
def sample_get_name(self, position: int = 0) -> str:
"""
@@ -625,51 +605,36 @@ class FlomniSampleTransferMixin:
def ftransfer_sample_change(self, new_sample_position: int):
self.check_tray_in()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper:
raise FlomniError("There is already a sample in the gripper. Aborting.")
self.check_position_is_valid(new_sample_position)
if new_sample_position == 0:
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.")
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
sample_placed = getattr(
dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
).get()
if not sample_placed:
raise FlomniError(
f"There is currently no sample in position [{new_sample_position}]. Aborting."
)
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
if sample_in_sample_stage:
# find a new home for the sample...
empty_slots = []
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1,20):
if not dev.flomni_samples.is_sample_slot_used(j):
empty_slots.append(j)
for name, val in dev.flomni_samples.read().items():
if "flomni_samples_sample_placed_sample" not in name:
continue
if val.get("value") == 0:
empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
if not empty_slots:
raise FlomniError("There are no empty slots available. Aborting.")
print(f"The following slots are empty: {empty_slots}.")
while True:
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}] ")
if user_input.strip() == "":
# No entry: use default
user_input = empty_slots[0]
break
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}]")
try:
user_input = int(user_input)
if user_input not in empty_slots:
@@ -735,20 +700,20 @@ class FlomniSampleTransferMixin:
if confirm != -1:
return
if self.OMNYTools.yesno("All OK? Continue?", "y"):
user_input = input("All OK? Continue? [y/n]")
if user_input == "y":
print("good then")
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
exit
return
def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
return status
def ftransfer_gripper_open(self):
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper:
raise FlomniError(
"Cannot open gripper. There is still a sample in the gripper! Aborting."
@@ -768,8 +733,11 @@ class FlomniSampleTransferMixin:
fsamx_pos = dev.fsamx.readback.get()
if position == 0 and fsamx_pos > -160:
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"):
user_input = input(
"May the flomni stage be moved out for the sample change? Feedback will be disabled"
" and alignment will be lost! [y/n]"
)
if user_input == "y":
print("good then")
self.ftransfer_flomni_stage_out()
else:
@@ -924,20 +892,7 @@ class FlomniSampleTransferMixin:
class FlomniAlignmentMixin:
import csaxs_bec
import os
from pathlib import Path
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
default_correction_file_rel = "correction_flomni_20210300_360deg.txt"
# Build the absolute path correctly
default_correction_file = (
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
).resolve()
default_correction_file = "correction_flomni_20210300_360deg.txt"
def reset_correction(self, use_default_correction=True):
"""
@@ -1151,7 +1106,6 @@ class Flomni(
FlomniAlignmentMixin,
FlomniOpticsMixin,
cSAXSBeamlineChecks,
flomniGuiTools
):
def __init__(self, client):
super().__init__()
@@ -1174,23 +1128,16 @@ class Flomni(
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
self.align = XrayEyeAlign(self.client, self)
self.set_client(client)
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
def start_x_ray_eye_alignment(self):
user_input = input(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample. [Y/n]"
)
if user_input == "y" or user_input == "":
self.align = XrayEyeAlign(self.client, self)
try:
self.align.align(keep_shutter_open)
self.align.align()
except KeyboardInterrupt as exc:
fsamx_in = self._get_user_param_safe(dev.fsamx, "in")
if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5):
@@ -1199,11 +1146,11 @@ class Flomni(
umv(dev.fsamx, fsamx_in)
raise exc
def xrayeye_update_frame(self,keep_shutter_open=False):
self.align.update_frame(keep_shutter_open)
def xrayeye_update_frame(self):
self.align.update_frame()
def xrayeye_alignment_start(self, keep_shutter_open=False):
self.start_x_ray_eye_alignment(keep_shutter_open)
def xrayeye_alignment_start(self):
self.start_x_ray_eye_alignment()
def drive_axis_to_limit(self, device, direction):
axis_id = device._config["deviceConfig"].get("axis_Id")
@@ -1623,9 +1570,6 @@ class Flomni(
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""start a tomo scan"""
self.flomnigui_show_progress()
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
@@ -1762,7 +1706,6 @@ class Flomni(
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
@@ -1852,7 +1795,7 @@ class Flomni(
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/tomography_scannumbers.txt"
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
)
with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable
@@ -1951,8 +1894,8 @@ class Flomni(
)
print(f"\nSample name: {self.sample_name}\n")
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
user_input = input("Are these parameters correctly set for your scan? [Y/n]")
if user_input == "y" or user_input == "":
print("... excellent!")
else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)

View File

@@ -33,7 +33,7 @@ class FlomniOpticsMixin:
feyex_in = self._get_user_param_safe("feyex", "in")
feyey_in = self._get_user_param_safe("feyey", "in")
umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
#self.align.update_frame()
self.align.update_frame()
def _ffzp_in(self):
foptx_in = self._get_user_param_safe("foptx", "in")

View File

@@ -1,173 +0,0 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class flomniGuiToolsError(Exception):
pass
class flomniGuiTools:
def __init__(self):
self.text_box = None
self.progressbar = None
def set_client(self, client):
self.client = client
self.gui = self.client.gui
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.gui.flomni.show()
else:
self.gui.new("flomni")
def flomnigui_stop_gui(self):
self.gui.flomni.hide()
def flomnigui_raise(self):
self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"flomni"):
if hasattr(self.gui.flomni,attribute_name):
return False
return True
def flomnigui_show_cameras(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True
camera_gripper_image.enable_fps_monitor = True
camera_gripper_image.enable_toolbar = False
camera_gripper_image.outer_axes = False
camera_gripper_image.inner_axes = False
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True
camera_overview_image.enable_fps_monitor = True
camera_overview_image.enable_toolbar = False
camera_overview_image.outer_axes = False
camera_overview_image.inner_axes = False
dev.cam_flomni_overview.start_live_mode()
else:
print("Cannot open camera_overview. Device does not exist.")
def flomnigui_remove_all_docks(self):
dev.cam_flomni_overview.stop_live_mode()
dev.cam_flomni_gripper.stop_live_mode()
dev.cam_xeye.live_mode = False
self.gui.flomni.delete_all()
self.progressbar = None
self.text_box = None
def flomnigui_idle(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
text = (
"<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n"
+ "| `-,| || | | || |'.'| || |' ' || | \n"
+ "| .-'| |' '-' '| | | || | ` || | \n"
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
def _flomnicam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def flomnigui_show_progress(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("progressbar"):
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
self.progress["subtomo_projection"]
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
if __name__ == "__main__":
from bec_lib.client import BECClient
from bec_widgets.cli.client_utils import BECGuiClient
client = BECClient()
client.start()
client.gui = BECGuiClient()
flomni_gui = flomniGuiTools(client)
flomni_gui.flomnigui_show_gui()
flomni_gui.flomnigui_show_progress()

View File

@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
from bec_lib import bec_logger
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
logger = bec_logger.logger
# import builtins to avoid linter errors
@@ -22,7 +22,6 @@ if TYPE_CHECKING:
class XrayEyeAlign:
# pixel calibration, multiply to get mm
labview=False
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None:
@@ -41,40 +40,28 @@ class XrayEyeAlign:
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self,keep_shutter_open=False):
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
if not self.labview:
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
def update_frame(self):
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
# start live
epics_put("XOMNYI-XEYE-ACQ:0", 1)
if self.labview:
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5)
# stop live view
if not keep_shutter_open:
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(0.1)
fshclose()
print("got new frame")
else:
print("Staying in live view, shutter is and remains open!")
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(1)
# fshclose
print("got new frame")
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
@@ -100,23 +87,12 @@ class XrayEyeAlign:
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
def align(self,keep_shutter_open=False):
if not keep_shutter_open:
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
self.send_message("Getting things ready. Please wait...")
#potential unresolved movement requests to zero
epics_put("XOMNYI-XEYE-MVX:0", 0)
epics_put("XOMNYI-XEYE-MVY:0", 0)
def align(self):
# reset shift xy and fov params
self._reset_init_values()
self.flomni.lights_off()
self.flomni.flomnigui_show_xeyealign()
self.flomni.flomnigui_raise()
self.tomo_rotate(0)
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
@@ -143,7 +119,7 @@ class XrayEyeAlign:
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in()
self.update_frame(keep_shutter_open)
self.update_frame()
# enable submit buttons
self.movement_buttons_enabled = False
@@ -176,18 +152,17 @@ class XrayEyeAlign:
self.flomni.feedback_disable()
umv(dev.fsamx, fsamx_in - 0.25)
if self.labview:
self.update_frame(keep_shutter_open)
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
self.update_frame()
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in)
time.sleep(0.5)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
self.update_frame()
self.send_message("Adjust sample height and submit center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
@@ -200,7 +175,7 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
self.tomo_rotate(k * 45)
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame(keep_shutter_open)
self.update_frame()
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
@@ -224,7 +199,7 @@ class XrayEyeAlign:
if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3)
self.update_frame(keep_shutter_open)
self.update_frame()
if k < 2:
# allow movements, store movements to calculate center
@@ -235,7 +210,7 @@ class XrayEyeAlign:
time.sleep(2)
epics_put("XOMNYI-XEYE-MVY:0", 0)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
self.update_frame()
time.sleep(0.2)
self.write_output()
@@ -246,16 +221,8 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
# free camera
if self.labview:
epics_put("XOMNYI-XEYE-ACQ:0", 2)
if keep_shutter_open and not self.labview:
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
# free camera
epics_put("XOMNYI-XEYE-ACQ:0", 2)
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
logger = bec_logger.logger
@@ -14,7 +14,6 @@ logger = bec_logger.logger
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye",
}
@@ -74,75 +73,3 @@ class OmnyAlignment(RPCBase):
"""
None
"""
class XRayEye(RPCBase):
@rpc_call
def active_roi(self) -> "BaseROI | None":
"""
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""
class XRayEye2DControl(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""

View File

@@ -63,7 +63,7 @@ class OmnyAlignment(BECWidget, QWidget):
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
@@ -98,10 +98,10 @@ class OmnyAlignment(BECWidget, QWidget):
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
image.image("cam200")
return
image.disconnect_monitor("cam_xeye")
image.disconnect_monitor("cam200")
@property

View File

@@ -86,7 +86,7 @@
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
<string>cam200</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye_plugin import XRayEyePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(XRayEyePlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1,426 +0,0 @@
from __future__ import annotations
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import (
QFrame,
QGridLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QSizePolicy,
QSpinBox,
QToolButton,
QVBoxLayout,
QWidget,
)
logger = bec_logger.logger
CAMERA = ("cam_xeye", "image")
class XRayEye2DControl(BECWidget, QWidget):
def __init__(self, parent=None, step_size: int = 100, *arg, **kwargs):
super().__init__(parent=parent, *arg, **kwargs)
self.get_bec_shortcuts()
self._step_size = step_size
self.root_layout = QGridLayout(self)
self.setStyleSheet("""
QToolButton {
border: 1px solid;
border-radius: 4px;
}
""")
# Up
self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button
self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left
self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button
self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right
self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button
self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down
self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button
self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections
self.move_up_button.clicked.connect(lambda: self.move("up", tweak=False))
self.move_up_tweak_button.clicked.connect(lambda: self.move("up", tweak=True))
self.move_down_button.clicked.connect(lambda: self.move("down", tweak=False))
self.move_down_tweak_button.clicked.connect(lambda: self.move("down", tweak=True))
self.move_left_button.clicked.connect(lambda: self.move("left", tweak=False))
self.move_left_tweak_button.clicked.connect(lambda: self.move("left", tweak=True))
self.move_right_button.clicked.connect(lambda: self.move("right", tweak=False))
self.move_right_tweak_button.clicked.connect(lambda: self.move("right", tweak=True))
@SafeProperty(int)
def step_size(self) -> int:
return self._step_size
@step_size.setter
def step_size(self, step_size: int):
self._step_size = step_size
@SafeSlot(bool)
def enable_controls_hor(self, enable: bool):
self.move_left_button.setEnabled(enable)
self.move_left_tweak_button.setEnabled(enable)
self.move_right_button.setEnabled(enable)
self.move_right_tweak_button.setEnabled(enable)
@SafeSlot(bool)
def enable_controls_ver(self, enable: bool):
self.move_up_button.setEnabled(enable)
self.move_up_tweak_button.setEnabled(enable)
self.move_down_button.setEnabled(enable)
self.move_down_tweak_button.setEnabled(enable)
def move(self, direction: str, tweak: bool = False):
step = self._step_size
if tweak:
step = int(self._step_size / 5)
if direction == "up":
self.dev.omny_xray_gui.mvy.set(step)
elif direction == "down":
self.dev.omny_xray_gui.mvy.set(-step)
elif direction == "left":
self.dev.omny_xray_gui.mvx.set(-step)
elif direction == "right":
self.dev.omny_xray_gui.mvx.set(step)
else:
logger.warning(f"Unknown direction {direction} for move command.")
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self.get_bec_shortcuts()
self._init_ui()
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self):
self.core_layout = QHBoxLayout(self)
self.image = Image(parent=self)
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
# Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self)
self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
compact_orientation="horizontal")
header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8)
header_row.addWidget(self.roi_manager, 0)
header_row.addStretch()
self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
self.control_panel_layout.addLayout(header_row)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# Step size label
step_size_form = QGridLayout()
# General Step size
self.step_size = QSpinBox(parent=self)
self.step_size.setRange(10, 100)
self.step_size.setSingleStep(10)
self.step_size.setValue(100)
# Submit button
self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel
self.control_panel_layout.addLayout(step_size_form)
# Push form to bottom
self.control_panel_layout.addStretch()
# Sample/Message form (bottom)
form = QGridLayout()
self.sample_name_line_edit = QLineEdit(parent=self)
self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QLineEdit(parent=self)
self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1)
self.control_panel_layout.addLayout(form)
# Fix panel width and allow vertical expansion
self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
# Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel)
def _make_connections(self):
# Fetch initial state
self.on_live_view_enabled(True)
self.step_size.setValue(self.motor_control_2d.step_size)
# Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
self.submit_button.clicked.connect(self.submit)
def _create_separator(self):
sep = QFrame(parent=self)
sep.setFrameShape(QFrame.HLine)
sep.setFrameShadow(QFrame.Sunken)
sep.setLineWidth(1)
return sep
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
################################################################################
# Device Connection logic
################################################################################
def connect_motors(self):
""" Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
logger.info(f"Succesfully connected to {motor}")
################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeProperty(str)
def user_message(self):
return self.message_line_edit.text()
@user_message.setter
def user_message(self, message: str):
self.message_line_edit.setText(message)
@SafeProperty(str)
def sample_name(self):
return self.sample_name_line_edit.text()
@sample_name.setter
def sample_name(self, message: str):
self.sample_name_line_edit.setText(message)
@SafeProperty(bool)
def enable_move_buttons(self):
return self.motor_control_2d.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled: bool):
self.motor_control_2d.setEnabled(enabled)
def active_roi(self) -> BaseROI | None:
"""Return the currently active ROI, or None if no ROI is active."""
return self.roi_manager.single_active_roi
################################################################################
# Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeSlot()
def get_roi_coordinates(self) -> dict | None:
"""Get the coordinates of the currently active ROI."""
roi = self.roi_manager.single_active_roi
if roi is None:
logger.warning("No active ROI")
return None
logger.info(f"Active ROI coordinates: {roi.get_coordinates()}")
return roi.get_coordinates()
@SafeSlot(bool)
def on_live_view_enabled(self, enabled: bool):
logger.info(f"Live view is enabled: {enabled}")
self.live_preview_toggle.blockSignals(True)
if enabled:
self.live_preview_toggle.checked = enabled
self.image.image(CAMERA)
self.live_preview_toggle.blockSignals(False)
return
self.image.disconnect_monitor(CAMERA)
self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool, bool)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(int)
def enable_submit_button(self, enable: int):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
if enable == -1:
self.submit_button.setEnabled(False)
else:
self.submit_button.setEnabled(True)
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@SafeSlot(dict, dict)
def device_updates(self, data: dict, meta: dict):
"""
Slot to handle device updates from omny_xray_gui device.
Args:
data(dict): data from device
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot()
def submit(self):
"""Execute submit action by submit button."""
if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI")
return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates['center_y']
# Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates['width']
roi_height = roi_coordinates['height']
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates['radius']
else:
logger.warning("Unsupported ROI type for submit action.")
return
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
getattr(self.dev,CAMERA[0]).live_mode = False
super().cleanup()
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
win = XRayEye()
win.resize(1000, 800)
win.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['x_ray_eye.py']}

View File

@@ -1,57 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from bec_widgets.utils.bec_designer import designer_material_icon
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye import XRayEye
DOM_XML = """
<ui language='c++'>
<widget class='XRayEye' name='x_ray_eye'>
</widget>
</ui>
"""
class XRayEyePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = XRayEye(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(XRayEye.ICON_NAME)
def includeFile(self):
return "x_ray_eye"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "XRayEye"
def toolTip(self):
return "XRayEye"
def whatsThis(self):
return self.toolTip()

View File

@@ -213,8 +213,6 @@ ftransy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
userParameter:
sensor_voltage: -2.4
ftransz:
description: Sample transer Z
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -335,8 +333,8 @@ rtx:
readOnly: false
readoutPriority: on_request
userParameter:
low_signal: 10000
min_signal: 9000
low_signal: 11000
min_signal: 10000
rt_pid_voltage: -0.06219
rty:
description: flomni rt
@@ -364,105 +362,3 @@ rtz:
onFailure: buffer
readOnly: false
readoutPriority: on_request
############################################################
####################### Cameras ############################
############################################################
cam_flomni_gripper:
description: Camera sample changer
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5000/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_flomni_overview:
description: Camera flomni overview
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5001/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_xeye:
description: Camera flOMNI Xray eye ID1
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 1
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
cam_ids_rgb:
description: Camera flOMNI Xray eye ID203
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 203
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
# ############################################################
# ################### flOMNI temperatures ####################
# ############################################################
flomni_temphum:
description: flOMNI Temperatures and humidity
deviceClass: csaxs_bec.devices.omny.flomni_temp_and_humidity.FlomniTempHum
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
# ############################################################
# ########## OMNY / flOMNI / LamNI fast shutter ##############
# ############################################################
omnyfsh:
description: omnyfsh connects to read fast shutter at X12 if in that network
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastEpicsShutter
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui Epics signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
calculated_signal:
description: Calculated signal from alignment for fit
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline

View File

@@ -0,0 +1,35 @@
#Standard configuration of an Owis LTM80 linear stage
samx:
description: Owis LTM80 linear stage
deviceClass: ophyd_devices.devices.EpicsMotorEx
deviceConfig:
prefix: X12SA-ES2-ES01
motor_resolution: 0.00125
base_velocity: 0.25
velocity: 2.5
backlash_distance: 0.125
acceleration: 0.4
user_offset_dir: 0
direction_of_travel: 1
deviceTags:
- cSAXS
- user_motor
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
#xbpm1x:
# description: 'X-ray BPM1 in frontend translation x'
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: 'X12SA-FE-XBPM1:TRX'
# onFailure: raise
# enabled: true
# readoutPriority: baseline
# readOnly: false
# softwareTrigger: false
# deviceTags:
# - cSAXS
# - frontend

View File

@@ -26,6 +26,20 @@ dmmx:
- cSAXS
- optics
#E_kev:
# description: 'Double Multilayer Monochromator, energy (keV)'
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: 'X12SA-OP-DMM1:ENERGY'
# onFailure: raise
# enabled: true
# readoutPriority: baseline
# readOnly: true
# softwareTrigger: false
# deviceTags:
# - cSAXS
# - optics
dmmy:
description: 'Double Multilayer Monochromator, translation Y'
deviceClass: ophyd.EpicsMotor

View File

@@ -1,129 +0,0 @@
"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK."""
import threading
import traceback
from enum import IntEnum
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd.areadetector import ADComponent as ADCpt
from ophyd.areadetector import DetectorBase
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
logger = bec_logger.logger
class ACQUIRE_MODES(IntEnum):
"""Acquiring enums for Allied Vision Camera"""
ACQUIRING = 1
DONE = 0
class AlliedVisionCamera(PSIDeviceBase, DetectorBase):
"""
Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK.
The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'.
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
cam = ADCpt(VimbaDetectorCam, "cam1:")
image = ADCpt(ImagePlugin, "image1:")
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0,
doc="Preview signal of the AlliedVision camera.",
)
def __init__(
self,
*,
name: str,
prefix: str,
poll_rate: int = 5,
scan_info=None,
device_manager=None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_start_event = threading.Event()
if poll_rate > 10:
logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.")
poll_rate = 10
self._poll_rate = poll_rate
self._unique_array_id = 0
self._pv_timeout = 2.0
self.r_lock = threading.RLock()
self.image: ImagePlugin
def start_live_mode(self) -> None:
"""Start live mode."""
if not self._poll_start_event.is_set():
self._poll_start_event.set()
self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition
else:
logger.info(f"Live mode already started for {self.name}.")
def stop_live_mode(self) -> None:
"""Stop live mode."""
if self._poll_start_event.is_set():
self._poll_start_event.clear()
self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition
else:
logger.info(f"Live mode already stopped for {self.name}.")
def on_connected(self):
"""Reset the unique array ID on connection."""
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
self._poll_thread.start()
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
while self._poll_start_event.wait():
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
array_size = self.image.array_size.get()
if array_size[0] == 0: # 2D image, not color image
array_size = array_size[1:]
# Geometry correction for the image
data = np.reshape(value, array_size)
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
def on_destroy(self):
"""Stop the polling thread on destruction."""
self._poll_thread_kill_event.set()
self._poll_start_event.set()
if self._poll_thread.is_alive():
self._poll_thread.join(timeout=2)

View File

@@ -37,7 +37,8 @@ import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd_devices import CompareStatus, DeviceStatus, TransitionStatus
from ophyd import DeviceStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (

View File

@@ -143,7 +143,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
self._add_delay = add_delay or 0
self._raise_states = raise_states or []
super().__init__(
obj=signal,
device=signal,
callback=self._compare_callback,
timeout=timeout,
settle_time=settle_time,

View File

@@ -1 +1 @@
from .ids_camera import IDSCamera
from .ids_camera_new import IDSCamera

View File

@@ -18,7 +18,6 @@ import atexit
from typing import Literal
import numpy as np
import time
from bec_lib.logger import bec_logger
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
@@ -67,8 +66,8 @@ class IDSCameraObject:
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
logger.info("Bayer color mode detected.")
# setup the color depth to the current windows setting
@@ -77,16 +76,16 @@ class IDSCameraObject:
) # TODO This raises an error - maybe check the m_n_colormode value
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
self.n_bits_per_pixel = self.ueye.INT(32)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_MONO8
@@ -160,17 +159,15 @@ class Camera:
"""
def __init__(
self,
camera_id: int,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: int = 24,
connect: bool = True,
force_monochrome: bool = False,
self,
camera_id: int,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: int = 24,
connect: bool = True,
):
self.ueye = ueye
self.camera_id = camera_id
self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel}
self.force_monochrome = force_monochrome
self._connected = False
self.cam = None
atexit.register(self.on_disconnect)
@@ -200,16 +197,14 @@ class Camera:
self.cam = IDSCameraObject(self.camera_id, **self._inputs)
self._connected = True
def on_disconnect(self, delay_after: float = 0.3):
"""Disconnect from the camera and optionally wait a short time for driver cleanup."""
def on_disconnect(self):
"""Disconnect from the camera."""
try:
if self.cam and self.cam.h_cam:
check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject")
self._connected = False
self.cam = None
if delay_after > 0:
time.sleep(delay_after)
logger.debug(f"Waited {delay_after:.2f}s after camera disconnect for cleanup.")
logger.info("Camera disconnected.")
except Exception as e:
logger.info(f"Error during camera disconnection: {e}")
@@ -268,19 +263,9 @@ class Camera:
if array is None:
logger.error("Failed to get image data from the camera.")
return None
img = np.reshape(
return np.reshape(
array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel)
)
# If RGB image (H, W, 3), reshuffle channels from BGR → RGB
if img.ndim == 3 and img.shape[2] == 3:
img = img[:, :, ::-1]
if self.force_monochrome:
gray = np.dot(img[..., :3], [0.2989, 0.5870, 0.1140]).astype(np.uint8)
# expand to 3D shape (H, W, 1) for consistency with real mono cams
img = np.expand_dims(gray, axis=-1)
img = np.ascontiguousarray(img)
return img
if __name__ == "__main__":

View File

@@ -1,225 +1,403 @@
"""IDS Camera class for cSAXS IDS cameras."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, Signal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
from ophyd_devices.utils.bec_signals import PreviewSignal
logger = bec_logger.logger
class IDSCamera(PSIDeviceBase):
"""IDS Camera class for cSAXS.
This class inherits from PSIDeviceBase and implements the necessary methods
to interact with the IDS camera using the pyueye library.
class ROISignal(Signal):
"""
Signal to handle the Region of Interest (ROI) for the IDS camera.
It is a tuple of (x, y, width, height).
"""
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0,
transpose=False)
roi_signal = Cpt(
AsyncSignal,
name="roi_signal",
ndim=0,
max_size=1000,
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
def __init__(
self,
*,
name,
roi: tuple | None = None,
value=0,
dtype=None,
shape=None,
timestamp=None,
parent=None,
labels=None,
kind=Kind.hinted,
tolerance=None,
rtolerance=None,
metadata=None,
cl=None,
attr_name="",
):
super().__init__(
name=name,
value=value,
dtype=dtype,
shape=shape,
timestamp=timestamp,
parent=parent,
labels=labels,
kind=kind,
tolerance=tolerance,
rtolerance=rtolerance,
metadata=metadata,
cl=cl,
attr_name=attr_name,
)
self.roi = roi
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
def get(self, **kwargs):
image = self.parent.image_data.get().data
if not isinstance(image, np.ndarray):
return -1 # -1 if no valid image is available
if self.roi is None:
roi = (0, 0, image.shape[1], image.shape[0])
else:
roi = self.roi
if len(image.shape) > 2:
image = np.sum(image, axis=2) # Convert to grayscale if it's a color image
return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1))
class IDSCamera(PSIDeviceBase):
""" "
#---------------------------------------------------------------------------------------------------------------------------------------
#Variables
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
sInfo = ueye.SENSORINFO()
cInfo = ueye.CAMINFO()
pcImageMemory = ueye.c_mem_p()
MemID = ueye.int()
rectAOI = ueye.IS_RECT()
pitch = ueye.INT()
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
bytes_per_pixel = int(nBitsPerPixel / 8)
ids_cam
...
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"]
image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted)
# roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal)
# roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal)
# roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal)
# roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal)
# roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal")
def __init__(
self,
*,
name: str,
camera_id: int,
prefix: str = "",
scan_info: ScanInfo | None = None,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: Literal[8, 24] = 24,
live_mode: bool = False,
num_rotation_90: int = 0,
transpose: bool = False,
force_monochrome: bool = False,
**kwargs,
self,
prefix="",
*,
name: str,
camera_ID: int,
bits_per_pixel: int,
channels: int,
m_n_colormode: int,
kind=None,
device_manager=None,
**kwargs,
):
"""Initialize the IDS Camera.
Args:
name (str): Name of the device.
camera_id (int): The ID of the camera device.
prefix (str): Prefix for the device.
scan_info (ScanInfo | None): Scan information for the device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._live_mode_thread: threading.Thread | None = None
self._stop_live_mode_event: threading.Event = threading.Event()
self.cam = Camera(
camera_id=camera_id,
m_n_colormode=m_n_colormode,
bits_per_pixel=bits_per_pixel,
connect=False,
super().__init__(
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose
self._force_monochrome = force_monochrome
############## Live Mode Methods ##############
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
return self._mask
@mask.setter
def mask(self, value: np.ndarray):
"""
Set the region of interest (ROI) for the camera.
Args:
value (np.ndarray): The mask to set as the ROI.
"""
if value.ndim != 2:
raise ValueError("ROI mask must be a 2D array.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
raise ValueError(
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
if x < 0 or y < 0 or width <= 0 or height <= 0:
raise ValueError("ROI coordinates and dimensions must be positive integers.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if x + width > img_shape[1] or y + height > img_shape[0]:
raise ValueError("ROI exceeds camera dimensions.")
mask = np.zeros(img_shape, dtype=np.uint8)
mask[y: y + height, x: x + width] = 1
self.mask = mask
def _start_live(self):
"""Start the live mode for the camera."""
if self._live_mode_thread is not None:
logger.info("Live mode thread is already running.")
return
self._stop_live_mode_event.clear()
self._live_mode_thread = threading.Thread(
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
self.camera_ID = camera_ID
self.bits_per_pixel = bits_per_pixel
self.bytes_per_pixel = None
self.channels = channels
self._m_n_colormode_input = m_n_colormode
self.m_n_colormode = None
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.c_info = None
self.pc_image_memory = None
self.mem_id = None
self.rect_aoi = None
self.pitch = None
self.n_bits_per_pixel = None
self.width = None
self.height = None
self.thread_event = threading.Event()
self.data_thread = None
self._roi: tuple | None = None # x, y, width, height
logger.info(
f"Deprecation warning: The IDSCamera class is deprecated. Use the new IDSCameraNew class instead."
)
self._live_mode_thread.start()
def _stop_live(self):
"""Stop the live mode for the camera."""
if self._live_mode_thread is None:
logger.info("Live mode thread is not running.")
return
self._stop_live_mode_event.set()
self._live_mode_thread.join(timeout=5)
if self._live_mode_thread.is_alive():
logger.warning("Live mode thread did not stop gracefully.")
def set_roi(self, x: int, y: int, width: int, height: int):
self._roi = (x, y, width, height)
def start_backend(self):
if self.ueye is None:
raise ImportError("The pyueye library is not installed.")
self.h_cam = self.ueye.HIDS(
self.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.s_info = self.ueye.SENSORINFO()
self.c_info = self.ueye.CAMINFO()
self.pc_image_memory = self.ueye.c_mem_p()
self.mem_id = self.ueye.int()
self.rect_aoi = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.n_bits_per_pixel = self.ueye.INT(
self.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.m_n_colormode = self.ueye.INT(
self._m_n_colormode_input
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
# Starts the driver and establishes the connection to the camera
ret = self.ueye.is_InitCamera(self.h_cam, None)
if ret != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
ret = self.ueye.is_ResetToDefault(self.h_cam)
if ret != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_n_colormode: \t\t", self.m_n_colormode)
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
n_bits_per_pixel = self.ueye.INT(32)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
self._live_mode_thread = None
logger.info("Live mode stopped.")
# for monochrome camera models use Y8 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("else")
def _live_mode_loop(self, stop_event: threading.Event):
"""Loop to capture images in live mode."""
while not stop_event.is_set():
try:
self.process_data(self.cam.get_image_data())
except Exception as e:
logger.error(f"Error in live mode loop: {e}")
break
stop_event.wait(0.2) # 5 Hz
# Can be used to set the size and position of an "area of interest"(AOI) within an image
ret = self.ueye.is_AOI(
self.h_cam,
self.ueye.IS_AOI_IMAGE_GET_AOI,
self.rect_aoi,
self.ueye.sizeof(self.rect_aoi),
)
if ret != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
def process_data(self, image: np.ndarray | None):
"""Process the image data before sending it to the preview signal."""
if image is None:
return
self.image.put(image)
self.width = self.rect_aoi.s32Width
self.height = self.rect_aoi.s32Height
def get_last_image(self) -> np.ndarray:
"""Get the last captured image from the camera."""
image = self.image.get()
if image:
return image.data
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
############## User Interface Methods ##############
# ---------------------------------------------------------------------------------------------------------------------------------------
def on_connected(self):
"""Connect to the camera."""
self.cam.force_monochrome = self._force_monochrome
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self.cam.on_disconnect()
super().on_destroy()
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
return
image = self.image.get()
if image is not None:
image: messages.DevicePreviewMessage
if self.mask.shape[0:2] != image.data.shape[0:2]:
logger.info(
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
)
return
if len(image.data.shape) == 3:
# If the image has multiple channels, apply the mask to each channel
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
n_channels = 3
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
ret = self.ueye.is_AllocImageMem(
self.h_cam,
self.width,
self.height,
self.n_bits_per_pixel,
self.pc_image_memory,
self.mem_id,
)
if ret != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
if ret != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
data = image.data * self.mask
n_channels = 1
self.roi_signal.put(np.sum(data) / (np.sum(self.mask) * n_channels))
# Set the desired color mode
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
# Activates the camera's live video mode (free run mode)
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
if ret != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
ret = self.ueye.is_InquireImageMem(
self.h_cam,
self.pc_image_memory,
self.mem_id,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
)
if ret != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
# startmeasureframerate = True
# Gain = False
# Start live mode of camera immediately
self.start_live_mode()
def _start_data_thread(self):
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
if self.ueye is None:
print("pyueye library not available.")
return
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = self.ueye.get_data(
self.pc_image_memory,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
copy=False,
)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.image_data.put(frame)
time.sleep(0.1)
def wait_for_connection(self, all_signals=False, timeout=10):
if ueye is None:
raise ImportError(
"The pyueye library is not installed or doesn't provide the necessary c libs"
)
super().wait_for_connection(all_signals, timeout)
def start_live_mode(self):
if self.data_thread is not None:
self.stop_live_mode()
self._start_data_thread()
def stop_live_mode(self):
"""Stopping the camera live mode."""
self.thread_event.set()
if self.data_thread is not None:
self.data_thread.join()
self.thread_event.clear()
self.data_thread = None
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.start_backend()
self.start_live_mode()
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
self.stop_live_mode()
if __name__ == "__main__":
# Example usage of the IDSCamera class
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")
# Example usage
camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1)
camera.wait_for_connection()
camera.on_destroy()

View File

@@ -0,0 +1,226 @@
"""IDS Camera class for cSAXS IDS cameras."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
class IDSCamera(PSIDeviceBase):
"""IDS Camera class for cSAXS.
This class inherits from PSIDeviceBase and implements the necessary methods
to interact with the IDS camera using the pyueye library.
"""
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.")
roi_signal = Cpt(
AsyncSignal,
name="roi_signal",
ndim=0,
max_size=1000,
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__(
self,
*,
name: str,
camera_id: int,
prefix: str = "",
scan_info: ScanInfo | None = None,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: Literal[8, 24] = 24,
live_mode: bool = False,
**kwargs,
):
"""Initialize the IDS Camera.
Args:
name (str): Name of the device.
camera_id (int): The ID of the camera device.
prefix (str): Prefix for the device.
scan_info (ScanInfo | None): Scan information for the device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._live_mode_thread: threading.Thread | None = None
self._stop_live_mode_event: threading.Event = threading.Event()
self.cam = Camera(
camera_id=camera_id,
m_n_colormode=m_n_colormode,
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
############## Live Mode Methods ##############
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
return self._mask
@mask.setter
def mask(self, value: np.ndarray):
"""
Set the region of interest (ROI) for the camera.
Args:
value (np.ndarray): The mask to set as the ROI.
"""
if value.ndim != 2:
raise ValueError("ROI mask must be a 2D array.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
raise ValueError(
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
if x < 0 or y < 0 or width <= 0 or height <= 0:
raise ValueError("ROI coordinates and dimensions must be positive integers.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if x + width > img_shape[1] or y + height > img_shape[0]:
raise ValueError("ROI exceeds camera dimensions.")
mask = np.zeros(img_shape, dtype=np.uint8)
mask[y : y + height, x : x + width] = 1
self.mask = mask
def _start_live(self):
"""Start the live mode for the camera."""
if self._live_mode_thread is not None:
logger.info("Live mode thread is already running.")
return
self._stop_live_mode_event.clear()
self._live_mode_thread = threading.Thread(
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
)
self._live_mode_thread.start()
def _stop_live(self):
"""Stop the live mode for the camera."""
if self._live_mode_thread is None:
logger.info("Live mode thread is not running.")
return
self._stop_live_mode_event.set()
self._live_mode_thread.join(timeout=5)
if self._live_mode_thread.is_alive():
logger.warning("Live mode thread did not stop gracefully.")
else:
self._live_mode_thread = None
logger.info("Live mode stopped.")
def _live_mode_loop(self, stop_event: threading.Event):
"""Loop to capture images in live mode."""
while not stop_event.is_set():
try:
self.process_data(self.cam.get_image_data())
except Exception as e:
logger.error(f"Error in live mode loop: {e}")
break
stop_event.wait(0.2) # 5 Hz
def process_data(self, image: np.ndarray | None):
"""Process the image data before sending it to the preview signal."""
if image is None:
return
self.image.put(image)
def get_last_image(self) -> np.ndarray:
"""Get the last captured image from the camera."""
image = self.image.get()
if image:
return image.data
############## User Interface Methods ##############
def on_connected(self):
"""Connect to the camera."""
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self.cam.on_disconnect()
super().on_destroy()
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
return
image = self.image.get()
if image is not None:
image: messages.DevicePreviewMessage
if self.mask.shape[0:2] != image.data.shape[0:2]:
logger.info(
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
)
return
if len(image.data.shape) == 3:
# If the image has multiple channels, apply the mask to each channel
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
n_channels = 3
else:
data = image.data * self.mask
n_channels = 1
self.roi_signal.put(
{
self.roi_signal.name: {
"value": np.sum(data)
/ (np.sum(self.mask) * n_channels), # TODO could be optimized
"timestamp": time.time(),
}
}
)
if __name__ == "__main__":
# Example usage of the IDSCamera class
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")

View File

@@ -24,25 +24,24 @@ class FlomniSampleStorage(Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {"auto_monitor": True}) for i in range(21)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {}) for i in range(21)
}
sample_placed = Dcpt(sample_placed)
sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True, "auto_monitor": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True})
for i in range(21)
}
sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET", auto_monitor=True
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET"
)
sample_in_gripper_name = Cpt(
EpicsSignal,
name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC",
string=True,
auto_monitor=True
)
def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -1,208 +0,0 @@
import time
import datetime
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import FRAME, PrettyTable
import numpy as np
class FlomniTempHumError(Exception):
pass
class FlomniTempHum(Device):
USER_ACCESS = [
"show_all",
"help",
]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
temperature_mirror = Cpt(
EpicsSignal, name="temperature_mirror", read_pv="XOMNI-TEMPHUM-MIRROR:0.VAL"
)
temperature_mirrorset_set = Cpt(
EpicsSignal, name="temperature_mirrorset_set", read_pv="XOMNI-TEMPHUM-MIRRORSET_SET:0.VAL"
)
temperature_mirrorset_rb = Cpt(
EpicsSignal, name="temperature_mirrorset_rb", read_pv="XOMNI-TEMPHUM-MIRRORSET_RB:0.VAL"
)
temperature_osa = Cpt(
EpicsSignal, name="temperature_osa", read_pv="XOMNI-TEMPHUM-OSA:0.VAL"
)
temperature_osaset_set = Cpt(
EpicsSignal, name="temperature_osaset_set", read_pv="XOMNI-TEMPHUM-OSASET_SET:0.VAL"
)
temperature_osaset_rb = Cpt(
EpicsSignal, name="temperature_osaset_rb", read_pv="XOMNI-TEMPHUM-OSASET_RB:0.VAL"
)
omegactrl_alive = Cpt(
EpicsSignal, name="omegactrl_alive", read_pv="XOMNI-TEMPHUM-OMEGACTRL-ALIVE:0.VAL"
)
galilctrl_alive = Cpt(
EpicsSignal, name="galilctrl_alive", read_pv="XOMNI-TEMPHUM-GALILCTRL-ALIVE:0.VAL"
)
temperature_heater = Cpt(
EpicsSignal, name="temperature_heater", read_pv="XOMNI-TEMPHUM-HEATER:0.VAL"
)
temperature_heaterset_set = Cpt(
EpicsSignal, name="temperature_heaterset_set", read_pv="XOMNI-TEMPHUM-HEATERSET_SET:0.VAL"
)
temperature_heaterset_rb = Cpt(
EpicsSignal, name="temperature_heaterset_rb", read_pv="XOMNI-TEMPHUM-HEATERSET_RB:0.VAL"
)
temperature_heaterhousing = Cpt(
EpicsSignal, name="temperature_heaterhousing", read_pv="XOMNI-TEMPHUM-HEATERHOUSE:0.VAL"
)
temperature_heaterhousing_alarm = Cpt(
EpicsSignal, name="temperature_heaterhousing_alarm", read_pv="XOMNI-TEMPHUM-HEATERHOUSEALARM:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
###### GALIL CONTROLLER
humidity_sensor1 = Cpt(
EpicsSignal, name="humidity_sensor1", read_pv="XOMNI-TEMPHUM-HUM1:0.VAL"
)
humidity_sensor2 = Cpt(
EpicsSignal, name="humidity_sensor2", read_pv="XOMNI-TEMPHUM-HUM2:0.VAL"
)
humidity_sensor1_temperature = Cpt(
EpicsSignal, name="humidity_sensor1_temperature", read_pv="XOMNI-TEMPHUM-TEMP1:0.VAL"
)
humidity_sensor2_temperature = Cpt(
EpicsSignal, name="humidity_sensor2_temperature", read_pv="XOMNI-TEMPHUM-TEMP2:0.VAL"
)
humidity_sensor1_err = Cpt(
EpicsSignal, name="humidity_sensor1_err", read_pv="XOMNI-TEMPHUM-ERR1:0.VAL"
)
humidity_sensor2_err = Cpt(
EpicsSignal, name="humidity_sensor2_err", read_pv="XOMNI-TEMPHUM-ERR2:0.VAL"
)
flow = Cpt(
EpicsSignal, name="flow", read_pv="XOMNI-TEMPHUM-FLOW:0.VAL"
)
flowset = Cpt(
EpicsSignal, name="flowset", read_pv="XOMNI-TEMPHUM-FLOWSET:0.VAL"
)
flowset_set = Cpt(
EpicsSignal, name="flowset_set", read_pv="XOMNI-TEMPHUM-FLOWSETSET:0.VAL"
)
humidityset = Cpt(
EpicsSignal, name="humidityset", read_pv="XOMNI-TEMPHUM-HUMSET:0.VAL"
)
humidityset_set = Cpt(
EpicsSignal, name="humidityset_set", read_pv="XOMNI-TEMPHUM-HUMSETSET:0.VAL"
)
suction = Cpt(
EpicsSignal, name="suction", read_pv="XOMNI-TEMPHUM-SUCTION:0.VAL"
)
valvedry = Cpt(
EpicsSignal, name="valvedry", read_pv="XOMNI-TEMPHUM-VALVEDRY:0.VAL"
)
valvewet = Cpt(
EpicsSignal, name="valvewet", read_pv="XOMNI-TEMPHUM-VALVEWET:0.VAL"
)
setuptemp = Cpt(
EpicsSignal, name="setuptemp", read_pv="XOMNI-TEMPHUM-SETUPTEMP:0.VAL"
)
def omega_controller_running(self):
time_diff = np.fabs(float(self.omegactrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def galil_controller_running(self):
time_diff = np.fabs(float(self.galilctrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.temperature_mirror.subscribe(self._emit_value, run=False)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
def show_all(self):
print("=== flOMNI Temperature & Humidity Overview ===")
print("")
print("Temperatures:")
print(f" Mirror: {float(self.temperature_mirror.get()):7.2f} °C")
print(f" Mirror Setpoint (RB): {float(self.temperature_mirrorset_rb.get()):7.2f} °C")
print(f" OSA: {float(self.temperature_osa.get()):7.2f} °C")
print(f" OSA Setpoint (RB): {float(self.temperature_osaset_rb.get()):7.2f} °C")
print(f" Heater: {float(self.temperature_heater.get()):7.2f} °C")
print(f" Heater Setpoint (RB): {float(self.temperature_heaterset_rb.get()):7.2f} °C")
print(f" Heater Enabled: {float(self.temperature_heater_enabled.get()):.0f}")
print(f" Heater Housing: {float(self.temperature_heaterhousing.get()):7.2f} °C")
print(f" Heater Housing Alarm: {float(self.temperature_heaterhousing_alarm.get()):.0f}")
print("")
print("Humidity Sensors:")
print(f" Sensor 1 Humidity: {float(self.humidity_sensor1.get()):7.2f} %RH")
print(f" Sensor 1 Temperature: {float(self.humidity_sensor1_temperature.get()):7.2f} °C")
print(f" Sensor 1 Error: {float(self.humidity_sensor1_err.get()):.0f}")
print(f" Sensor 2 Humidity: {float(self.humidity_sensor2.get()):7.2f} %RH")
print(f" Sensor 2 Temperature: {float(self.humidity_sensor2_temperature.get()):7.2f} °C")
print(f" Sensor 2 Error: {float(self.humidity_sensor2_err.get()):.0f}")
print(f" Humidity Setpoint: {float(self.humidityset.get()):7.2f} %RH")
print("")
print("Flow Control:")
print(f" Flow: {float(self.flow.get()):7.2f} sccm")
print(f" Flow Setpoint (RB): {float(self.flowset.get()):7.2f} sccm")
print("")
print("Suction:")
print(f" Suction: {float(self.suction.get()):7.2f}")
print("")
print("Valves:")
print(f" Dry Valve: {float(self.valvedry.get()):.0f}")
print(f" Wet Valve: {float(self.valvewet.get()):.0f}")
print("")
print("Controller Heartbeats:")
print(f" OMEGA Controller Alive: {self.omega_controller_running()}")
print(f" GALIL Controller Alive: {self.galil_controller_running()}")
print("==============================================")
def help(self):
print("Help for flOMNI temperature and humidity control system:")
print("Available methods:")
print(" show_all() - display all current values")

View File

@@ -37,86 +37,84 @@ class OMNYSampleStorage(Device):
_default_sub = SUB_VALUE
sample_shuttle_A_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {"auto_monitor": True}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {}) for i in range(1, 7)
}
sample_shuttle_A_placed = Dcpt(sample_shuttle_A_placed)
sample_shuttle_B_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {"auto_monitor": True}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {}) for i in range(1, 7)
}
sample_shuttle_B_placed = Dcpt(sample_shuttle_B_placed)
sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
}
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
}
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
parking_placed = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {"auto_monitor": True}) for i in range(1, 7)
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {}) for i in range(1, 7)
}
parking_placed = Dcpt(parking_placed)
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {"auto_monitor": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_placed = Dcpt(sample_placed)
sample_shuttle_A_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True, "auto_monitor": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True})
for i in range(1, 7)
}
sample_shuttle_A_names = Dcpt(sample_shuttle_A_names)
sample_shuttle_B_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True, "auto_monitor": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True})
for i in range(1, 7)
}
sample_shuttle_B_names = Dcpt(sample_shuttle_B_names)
sample_shuttle_C_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True, "auto_monitor": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True})
for i in range(1, 7)
}
sample_shuttle_C_names = Dcpt(sample_shuttle_C_names)
parking_names = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True, "auto_monitor": True})
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True})
for i in range(1, 7)
}
parking_names = Dcpt(parking_names)
sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True, "auto_monitor": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL", auto_monitor=True
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL"
)
sample_in_gripper_name = Cpt(
EpicsSignal,
name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_omny:110.DESC",
string=True,
auto_monitor=True
)
sample_in_samplestage = Cpt(
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL", auto_monitor=True
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL"
)
sample_in_samplestage_name = Cpt(
EpicsSignal,
name="sample_in_samplestage_name",
read_pv="XOMNY-SAMPLE_DB_omny:0.DESC",
string=True,
auto_monitor=True
)
def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -392,9 +392,6 @@ class RtFlomniController(Controller):
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal:
time.sleep(1)
if signal < min_signal:

View File

@@ -1,82 +0,0 @@
import time
import socket
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import EpicsSignal
class OMNYFastEpicsShutterError(Exception):
pass
def _detect_host_pv():
"""Detect host subnet and return appropriate PV name."""
try:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if local_ip.startswith("129.129.122."):
return "X12SA-ES1-TTL:OUT_01"
else:
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
except Exception as ex:
print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.")
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
class OMNYFastEpicsShutter(Device):
"""
Fast EPICS shutter with automatic PV selection based on host subnet.
"""
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
# PV is detected dynamically at import time
shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True)
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.shutter.subscribe(self._emit_value)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self.wait_for_connection()
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
# -----------------------------------------------------
# User-facing shutter control functions
# -----------------------------------------------------
def fshopen(self):
"""Open the fast shutter."""
try:
self.shutter.put(1, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
def fshclose(self):
"""Close the fast shutter."""
try:
self.shutter.put(0, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
def fshstatus(self):
"""Return the fast shutter status (0=closed, 1=open)."""
try:
return self.shutter.get()
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
def fshinfo(self):
"""Print information about which EPICS PV channel is being used."""
pvname = self.shutter.pvname
print(f"Fast shutter connected to EPICS channel: {pvname}")
return pvname
def help(self):
"""Display available user methods."""
print("Available methods:")
for method in self.USER_ACCESS:
print(f" - {method}")

View File

@@ -1,65 +0,0 @@
import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt
from ophyd_devices import PreviewSignal
import traceback
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs)
self.url = url
self._connection = None
self._update_thread = None
self._buffer = b""
self._shutdown_event = threading.Event()
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
def start_live_mode(self) -> None:
if self._connection is not None:
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def _update_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
self._connection = requests.get(self.url, stream=True)
for chunk in self._connection.iter_content(chunk_size=1024):
self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end
if start == -1 or end == -1:
continue
jpg = self._buffer[start:end+2]
self._buffer = self._buffer[end+2:]
image = cv2.imdecode(np.frombuffer(jpg, np.uint8), cv2.IMREAD_COLOR)
if image is not None:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
self.preview.put(image)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -1,74 +0,0 @@
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayEpicsGUI(Device):
save_frame = Cpt(
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
)
update_frame_acqdone = Cpt(
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
)
step = Cpt(
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
)
xval_x_dynamic = {
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)

View File

@@ -33,11 +33,11 @@ logger = bec_logger.logger
class FlomniFermatScan(SyncFlyScanBase):
scan_name = "flomni_fermat_scan"
scan_report_hint = "table"
scan_type = "fly"
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
use_scan_progress_report = True
def __init__(
self,
@@ -74,7 +74,6 @@ class FlomniFermatScan(SyncFlyScanBase):
"""
super().__init__(parameter=parameter, **kwargs)
self.show_live_table = False
self.axis = []
self.fovx = fovx
self.fovy = fovy
@@ -169,7 +168,6 @@ class FlomniFermatScan(SyncFlyScanBase):
tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_signalstrength"
)
#self.device_manager.connector.send_client_info(tracker_signal_status)
if tracker_signal_status == "low":
self.device_manager.connector.raise_alarm(
severity=0,

View File

@@ -88,14 +88,6 @@ Depending on the tomo mode following parameters can be given to the `flomni.tomo
| Golden ratio tomography (sorted in bunches) | projection_number=None |
| Equally spaced with golden starting angle | projection_number=None |
### GUI tools
During operation the BEC GUI will show the relevant cameras or progress information. To manually switch view TAB completion on 'flomni.flomnigui_' will show all options to control the GUI. Most useful
'flomni.flomnigui_show_cameras()' will show the cameras for sample transfer and interior overview
'flomni.flomnigui_show_progress()' will show the measurement progress GUI
'flomnigui_show_xeyealign()' will show the XrayEye alignment GUI
## How to setup flOMNI (software)
This part of the manual is intended for beamline staff and expert users
@@ -226,15 +218,9 @@ Update the values by, example for feyex and in position,
To refresh the frame of the xray eye windows software
`flomni.xrayeye_update_frame()`
This command can also be called to keep the shutter open and live view active
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
To start the xray eye alignment (and clear any previous alignment)
`flomni.xrayeye_alignment_start()`
This command can also be called to keep the shutter open and live view active. Warning: The dose to the sample will be significantly higher.
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
To load the fit parameters from directory _dir_path_ computed by _SPEC_ptycho_align.m_ in Matlab run
`flomni.read_alignment_offset(dir_path='')`
The loading routine uses default values for the vertical alignment. This behavior can be changed (e.g. for getting new default values) by the parameter `use_vertical_default_values=False`.

View File

@@ -16,7 +16,7 @@ dependencies = [
"bec_ipython_client",
"bec_lib",
"bec_server",
"ophyd_devices~=1.29",
"ophyd_devices",
"std_daq_client",
"jfjoch-client",
"rich",
@@ -24,7 +24,6 @@ dependencies = [
"pyueye", # for the IDS uEye camera
"bec_widgets",
"zmq",
"opencv-python",
]
[project.optional-dependencies]

View File

@@ -5,7 +5,7 @@ from unittest import mock
import numpy as np
import pytest
from csaxs_bec.devices.ids_cameras.ids_camera import IDSCamera
from csaxs_bec.devices.ids_cameras.ids_camera_new import IDSCamera
@pytest.fixture(scope="function")