Compare commits

..

25 Commits

Author SHA1 Message Date
x01dc
22d8dbe972 added force monochrome mode plus delay after disconnect to ensure
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m45s
CI for csaxs_bec / test (push) Successful in 1m18s
working after config reload
2025-11-12 13:36:12 +01:00
x01dc
2411e7be56 new shutter device
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m17s
CI for csaxs_bec / test (push) Successful in 1m17s
2025-11-07 14:34:05 +01:00
f8b20752f5 updated gui section
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m22s
2025-11-05 13:12:59 +01:00
e301b94e7c fix: add num_rotation_90 and transpose to ids_camera
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m0s
CI for csaxs_bec / test (push) Successful in 1m25s
2025-11-04 14:12:04 +01:00
x01dc
61011f098d attribute based checking if window exists
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 42s
CI for csaxs_bec / test (push) Successful in 1m33s
2025-11-04 13:39:35 +01:00
x01dc
efca170f04 minor fixes: basepath correction file, raise gui
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 38s
CI for csaxs_bec / test (push) Successful in 1m21s
2025-11-03 11:23:32 +01:00
x01dc
af2a69f825 various fixes or adjustments during testing the new alignment gui
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m18s
CI for csaxs_bec / test (push) Successful in 1m18s
2025-10-31 11:38:27 +01:00
c8c71d466c refactor(xray_gui): minor cleanup
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m17s
CI for csaxs_bec / test (push) Successful in 1m14s
2025-10-23 14:39:59 +02:00
94d984b8a2 fix(camera): BGR to RGB logic moved to Camera 2025-10-23 14:39:59 +02:00
x01dc
39d2c97247 fix: xray eye align script changes 2025-10-23 14:39:59 +02:00
x01dc
de22611941 refactor(ids_camera): old ids_camera deleted, ids_camera_new is now ids_camera 2025-10-23 14:39:59 +02:00
4723f6768b feat(xray_eye): add XRayEye widget and plugin for GUI integration 2025-10-23 11:07:30 +02:00
x01dc
5b76c3f769 feat(gui_instruction_device): added gui instruction device from epics 2025-10-23 11:04:21 +02:00
4590b85010 fix(omny_alignment): disabled not working signal connection to on_move_up 2025-10-23 11:02:28 +02:00
7233fb8d35 w
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m16s
CI for csaxs_bec / test (push) Successful in 1m19s
2025-10-22 10:55:58 +02:00
32d3232008 fix(ids-camera): fix roi_signal after AsyncSignal refactoring
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 3s
CI for csaxs_bec / test (push) Failing after 23s
2025-10-22 10:48:40 +02:00
x01dc
725eed17ed finalized flomni temp and humidity display
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m16s
CI for csaxs_bec / test (push) Successful in 1m15s
2025-10-14 11:41:31 +02:00
x01dc
1d9fb39c0e initial version flomni temp humidity 2025-10-14 11:41:31 +02:00
x01dc
f1dd299fad gui tools running 2025-10-14 11:41:31 +02:00
1fcb213336 ci: pull from github instead of gitea
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m23s
CI for csaxs_bec / test (push) Successful in 1m22s
2025-10-08 16:33:34 +02:00
x01dc
b08c7bf44b removed debug info
Some checks failed
CI for csaxs_bec / test (push) Failing after 34s
CI for csaxs_bec / test (pull_request) Failing after 35s
2025-10-08 16:23:38 +02:00
x01dc
d16f6b703c tested gui tools 2025-10-08 16:23:38 +02:00
x01dc
f526d5cc05 gui tools running 2025-10-08 16:23:38 +02:00
gac-x01dc
8f7914b978 - added camera viewer device
- fixed some issues in flomni sample
storage device
2025-10-08 16:22:13 +02:00
gac-x01dc
6c65d5546c fixed issue with flomni storage not updating and start using OMNYTools for yesno 2025-10-08 16:22:13 +02:00
31 changed files with 1747 additions and 806 deletions

View File

@@ -47,25 +47,13 @@ jobs:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
- name: Checkout BEC Core
uses: actions/checkout@v4
with:
repository: bec/bec
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
path: ./bec
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
- name: Checkout Ophyd Devices
uses: actions/checkout@v4
with:
repository: bec/ophyd_devices
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
path: ./ophyd_devices
run: git clone --depth 1 --branch "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}" https://github.com/bec-project/ophyd_devices.git ./ophyd_devices
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec/bec_widgets
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4

View File

@@ -14,6 +14,8 @@ from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
logger = bec_logger.logger
@@ -24,27 +26,57 @@ if builtins.__dict__.get("bec") is not None:
umvr = builtins.__dict__.get("umvr")
class FlomniToolsError(Exception):
pass
class FlomniInitError(Exception):
pass
class FlomniError(Exception):
pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
class FlomniInitStagesMixin:
def flomni_init_stages(self):
user_input = input("Starting initialization of flOMNI stages. OK? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Starting initialization of flOMNI stages. OK?"):
print("staring...")
else:
return
if self.check_all_axes_of_fomni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
else:
return
@@ -74,10 +106,8 @@ class FlomniInitStagesMixin:
dev.feyex.limits = [-30, -1]
print("done")
user_input = input(
"Init of foptz. Can the stage move to the upstream limit without collision? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"):
print("good then")
else:
return
@@ -131,10 +161,7 @@ class FlomniInitStagesMixin:
dev.fsamy.limits = [2, 3.1]
print("done")
user_input = input(
"Init of tracking stages. Did you remove the outer laser flight tubes? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"):
print("good then")
else:
print("Stopping.")
@@ -150,8 +177,7 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5]
print("done")
user_input = input("Init of sample stage. Is the piezo at about 0 deg? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Init of sample stage. Is the piezo at about 0 deg?"):
print("good then")
else:
print("Stopping.")
@@ -168,11 +194,7 @@ class FlomniInitStagesMixin:
print("done")
print("Initializing UPR stage.")
user_input = input(
"To ensure that the end switches work, please check that they are currently not pushed."
" Is everything okay? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"):
print("good then")
else:
print("Stopping.")
@@ -193,8 +215,7 @@ class FlomniInitStagesMixin:
time.sleep(1)
continue
break
user_input = input("Shall I start the index search? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Shall I start the index search?"):
print("good then. Starting index search.")
else:
print("Stopping.")
@@ -213,11 +234,7 @@ class FlomniInitStagesMixin:
dev.fsamroy.limits = [-5, 365]
print("done")
user_input = input(
"Init of foptx. Can the stage move to the positive limit without collision? Attention:"
" tracker flight tube! [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"):
print("good then")
else:
print("Stopping.")
@@ -241,8 +258,7 @@ class FlomniInitStagesMixin:
continue
break
user_input = input("Start limit switch search of fopty? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start limit switch search of fopty?"):
print("good then")
else:
print("Stopping.")
@@ -275,8 +291,7 @@ class FlomniInitStagesMixin:
return False
def set_limits(self):
user_input = input("Set default limits for flOMNI? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Set default limits for flOMNI?"):
print("setting limits...")
else:
print("Stopping.")
@@ -303,8 +318,7 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5]
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start moving stages to default initial positions?", "y"):
print("Start moving stages...")
else:
print("Stopping.")
@@ -397,7 +411,8 @@ class FlomniSampleTransferMixin:
raise FlomniError("Ftray is not at the 'IN' position. Aborting.")
def ftransfer_flomni_stage_in(self):
sample_in_position = bool(float(dev.flomni_samples.sample_placed.sample0.get()))
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
if not sample_in_position:
raise FlomniError("There is no sample in the sample stage. Aborting.")
self.reset_correction()
@@ -410,6 +425,8 @@ class FlomniSampleTransferMixin:
umv(dev.fsamx, fsamx_in)
dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4]
self.flomnigui_idle()
def laser_tracker_show_all(self):
dev.rtx.controller.laser_tracker_show_all()
@@ -449,6 +466,10 @@ class FlomniSampleTransferMixin:
self.device_manager.devices.fsamx.controller.lights_on()
def ftransfer_flomni_stage_out(self):
self.flomnigui_show_cameras()
self.flomnigui_raise()
target_pos = -162
if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01):
return
@@ -496,22 +517,20 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError(
"The gripper does carry a sample. Cannot proceed getting another sample."
)
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
if not sample_in_position:
raise FlomniError(f"The planned pick position [{position}] does not have a sample.")
user_input = input(
"Please confirm that there is currently no sample in the gripper. It would be dropped!"
" [y/n]"
)
if user_input == "y":
self.flomnigui_show_cameras()
if self.OMNYTools.yesno("Please confirm that there is currently no sample in the gripper. It would be dropped!", "y"):
print("good then")
else:
print("Stopping.")
@@ -555,12 +574,12 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
if not sample_in_gripper:
raise FlomniError("The gripper does not carry a sample.")
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
if sample_in_position:
raise FlomniError(f"The planned put position [{position}] already has a sample.")
@@ -593,8 +612,9 @@ class FlomniSampleTransferMixin:
self.flomni_modify_storage_non_interactive(100, 0, "-")
self.flomni_modify_storage_non_interactive(position, 1, sample_name)
# TODO: flomni_stage_in if position == 0
# bec.queue.next_dataset_number += 1
if position == 0:
self.ftransfer_flomni_stage_in()
bec.queue.next_dataset_number += 1
def sample_get_name(self, position: int = 0) -> str:
"""
@@ -605,36 +625,51 @@ class FlomniSampleTransferMixin:
def ftransfer_sample_change(self, new_sample_position: int):
self.check_tray_in()
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError("There is already a sample in the gripper. Aborting.")
self.check_position_is_valid(new_sample_position)
sample_placed = getattr(
dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
).get()
if new_sample_position == 0:
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.")
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
if not sample_placed:
raise FlomniError(
f"There is currently no sample in position [{new_sample_position}]. Aborting."
)
sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
if sample_in_sample_stage:
# find a new home for the sample...
empty_slots = []
for name, val in dev.flomni_samples.read().items():
if "flomni_samples_sample_placed_sample" not in name:
continue
if val.get("value") == 0:
empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1,20):
if not dev.flomni_samples.is_sample_slot_used(j):
empty_slots.append(j)
if not empty_slots:
raise FlomniError("There are no empty slots available. Aborting.")
print(f"The following slots are empty: {empty_slots}.")
while True:
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}]")
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}] ")
if user_input.strip() == "":
# No entry: use default
user_input = empty_slots[0]
break
try:
user_input = int(user_input)
if user_input not in empty_slots:
@@ -700,20 +735,20 @@ class FlomniSampleTransferMixin:
if confirm != -1:
return
user_input = input("All OK? Continue? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All OK? Continue?", "y"):
print("good then")
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
return
exit
def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
return status
def ftransfer_gripper_open(self):
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
#dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper:
raise FlomniError(
"Cannot open gripper. There is still a sample in the gripper! Aborting."
@@ -733,11 +768,8 @@ class FlomniSampleTransferMixin:
fsamx_pos = dev.fsamx.readback.get()
if position == 0 and fsamx_pos > -160:
user_input = input(
"May the flomni stage be moved out for the sample change? Feedback will be disabled"
" and alignment will be lost! [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"):
print("good then")
self.ftransfer_flomni_stage_out()
else:
@@ -892,7 +924,20 @@ class FlomniSampleTransferMixin:
class FlomniAlignmentMixin:
default_correction_file = "correction_flomni_20210300_360deg.txt"
import csaxs_bec
import os
from pathlib import Path
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
default_correction_file_rel = "correction_flomni_20210300_360deg.txt"
# Build the absolute path correctly
default_correction_file = (
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
).resolve()
def reset_correction(self, use_default_correction=True):
"""
@@ -1106,6 +1151,7 @@ class Flomni(
FlomniAlignmentMixin,
FlomniOpticsMixin,
cSAXSBeamlineChecks,
flomniGuiTools
):
def __init__(self, client):
super().__init__()
@@ -1128,16 +1174,23 @@ class Flomni(
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
self.align = XrayEyeAlign(self.client, self)
self.set_client(client)
def start_x_ray_eye_alignment(self):
user_input = input(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample. [Y/n]"
)
if user_input == "y" or user_input == "":
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
self.align = XrayEyeAlign(self.client, self)
try:
self.align.align()
self.align.align(keep_shutter_open)
except KeyboardInterrupt as exc:
fsamx_in = self._get_user_param_safe(dev.fsamx, "in")
if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5):
@@ -1146,11 +1199,11 @@ class Flomni(
umv(dev.fsamx, fsamx_in)
raise exc
def xrayeye_update_frame(self):
self.align.update_frame()
def xrayeye_update_frame(self,keep_shutter_open=False):
self.align.update_frame(keep_shutter_open)
def xrayeye_alignment_start(self):
self.start_x_ray_eye_alignment()
def xrayeye_alignment_start(self, keep_shutter_open=False):
self.start_x_ray_eye_alignment(keep_shutter_open)
def drive_axis_to_limit(self, device, direction):
axis_id = device._config["deviceConfig"].get("axis_Id")
@@ -1570,6 +1623,9 @@ class Flomni(
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""start a tomo scan"""
self.flomnigui_show_progress()
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
@@ -1706,6 +1762,7 @@ class Flomni(
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
@@ -1795,7 +1852,7 @@ class Flomni(
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
"~/tomography_scannumbers.txt"
)
with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable
@@ -1894,8 +1951,8 @@ class Flomni(
)
print(f"\nSample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? [Y/n]")
if user_input == "y" or user_input == "":
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("... excellent!")
else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)

View File

@@ -33,7 +33,7 @@ class FlomniOpticsMixin:
feyex_in = self._get_user_param_safe("feyex", "in")
feyey_in = self._get_user_param_safe("feyey", "in")
umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
self.align.update_frame()
#self.align.update_frame()
def _ffzp_in(self):
foptx_in = self._get_user_param_safe("foptx", "in")

View File

@@ -0,0 +1,173 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class flomniGuiToolsError(Exception):
pass
class flomniGuiTools:
def __init__(self):
self.text_box = None
self.progressbar = None
def set_client(self, client):
self.client = client
self.gui = self.client.gui
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.gui.flomni.show()
else:
self.gui.new("flomni")
def flomnigui_stop_gui(self):
self.gui.flomni.hide()
def flomnigui_raise(self):
self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"flomni"):
if hasattr(self.gui.flomni,attribute_name):
return False
return True
def flomnigui_show_cameras(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True
camera_gripper_image.enable_fps_monitor = True
camera_gripper_image.enable_toolbar = False
camera_gripper_image.outer_axes = False
camera_gripper_image.inner_axes = False
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True
camera_overview_image.enable_fps_monitor = True
camera_overview_image.enable_toolbar = False
camera_overview_image.outer_axes = False
camera_overview_image.inner_axes = False
dev.cam_flomni_overview.start_live_mode()
else:
print("Cannot open camera_overview. Device does not exist.")
def flomnigui_remove_all_docks(self):
dev.cam_flomni_overview.stop_live_mode()
dev.cam_flomni_gripper.stop_live_mode()
dev.cam_xeye.live_mode = False
self.gui.flomni.delete_all()
self.progressbar = None
self.text_box = None
def flomnigui_idle(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
text = (
"<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n"
+ "| `-,| || | | || |'.'| || |' ' || | \n"
+ "| .-'| |' '-' '| | | || | ` || | \n"
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
def _flomnicam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def flomnigui_show_progress(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("progressbar"):
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
self.progress["subtomo_projection"]
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
if __name__ == "__main__":
from bec_lib.client import BECClient
from bec_widgets.cli.client_utils import BECGuiClient
client = BECClient()
client.start()
client.gui = BECGuiClient()
flomni_gui = flomniGuiTools(client)
flomni_gui.flomnigui_show_gui()
flomni_gui.flomnigui_show_progress()

View File

@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
from bec_lib import bec_logger
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger
# import builtins to avoid linter errors
@@ -22,6 +22,7 @@ if TYPE_CHECKING:
class XrayEyeAlign:
# pixel calibration, multiply to get mm
labview=False
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None:
@@ -40,28 +41,40 @@ class XrayEyeAlign:
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self):
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
# start live
def update_frame(self,keep_shutter_open=False):
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
if not self.labview:
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
epics_put("XOMNYI-XEYE-ACQ:0", 1)
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
if self.labview:
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5)
# stop live view
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(1)
# fshclose
print("got new frame")
if not keep_shutter_open:
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(0.1)
fshclose()
print("got new frame")
else:
print("Staying in live view, shutter is and remains open!")
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
@@ -87,12 +100,23 @@ class XrayEyeAlign:
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
def align(self):
def align(self,keep_shutter_open=False):
if not keep_shutter_open:
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
self.send_message("Getting things ready. Please wait...")
#potential unresolved movement requests to zero
epics_put("XOMNYI-XEYE-MVX:0", 0)
epics_put("XOMNYI-XEYE-MVY:0", 0)
# reset shift xy and fov params
self._reset_init_values()
self.flomni.lights_off()
self.flomni.flomnigui_show_xeyealign()
self.flomni.flomnigui_raise()
self.tomo_rotate(0)
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
@@ -119,7 +143,7 @@ class XrayEyeAlign:
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in()
self.update_frame()
self.update_frame(keep_shutter_open)
# enable submit buttons
self.movement_buttons_enabled = False
@@ -152,17 +176,18 @@ class XrayEyeAlign:
self.flomni.feedback_disable()
umv(dev.fsamx, fsamx_in - 0.25)
self.update_frame()
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
if self.labview:
self.update_frame(keep_shutter_open)
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in)
time.sleep(0.5)
self.flomni.feedback_enable_with_reset()
self.update_frame()
self.update_frame(keep_shutter_open)
self.send_message("Adjust sample height and submit center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
@@ -175,7 +200,7 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
self.tomo_rotate(k * 45)
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.update_frame(keep_shutter_open)
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
@@ -199,7 +224,7 @@ class XrayEyeAlign:
if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3)
self.update_frame()
self.update_frame(keep_shutter_open)
if k < 2:
# allow movements, store movements to calculate center
@@ -210,7 +235,7 @@ class XrayEyeAlign:
time.sleep(2)
epics_put("XOMNYI-XEYE-MVY:0", 0)
self.flomni.feedback_enable_with_reset()
self.update_frame()
self.update_frame(keep_shutter_open)
time.sleep(0.2)
self.write_output()
@@ -221,8 +246,16 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
# free camera
epics_put("XOMNYI-XEYE-ACQ:0", 2)
# free camera
if self.labview:
epics_put("XOMNYI-XEYE-ACQ:0", 2)
if keep_shutter_open and not self.labview:
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
logger = bec_logger.logger
@@ -14,6 +14,7 @@ logger = bec_logger.logger
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye",
}
@@ -73,3 +74,75 @@ class OmnyAlignment(RPCBase):
"""
None
"""
class XRayEye(RPCBase):
@rpc_call
def active_roi(self) -> "BaseROI | None":
"""
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""
class XRayEye2DControl(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""

View File

@@ -63,7 +63,7 @@ class OmnyAlignment(BECWidget, QWidget):
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
self.ui.moveUpButton.clicked.connect(self.on_move_up)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
@@ -98,10 +98,10 @@ class OmnyAlignment(BECWidget, QWidget):
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam200")
image.image("cam_xeye")
return
image.disconnect_monitor("cam200")
image.disconnect_monitor("cam_xeye")
@property

View File

@@ -86,7 +86,7 @@
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam200</string>
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>

View File

@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye_plugin import XRayEyePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(XRayEyePlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,426 @@
from __future__ import annotations
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import (
QFrame,
QGridLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QSizePolicy,
QSpinBox,
QToolButton,
QVBoxLayout,
QWidget,
)
logger = bec_logger.logger
CAMERA = ("cam_xeye", "image")
class XRayEye2DControl(BECWidget, QWidget):
def __init__(self, parent=None, step_size: int = 100, *arg, **kwargs):
super().__init__(parent=parent, *arg, **kwargs)
self.get_bec_shortcuts()
self._step_size = step_size
self.root_layout = QGridLayout(self)
self.setStyleSheet("""
QToolButton {
border: 1px solid;
border-radius: 4px;
}
""")
# Up
self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button
self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left
self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button
self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right
self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button
self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down
self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button
self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections
self.move_up_button.clicked.connect(lambda: self.move("up", tweak=False))
self.move_up_tweak_button.clicked.connect(lambda: self.move("up", tweak=True))
self.move_down_button.clicked.connect(lambda: self.move("down", tweak=False))
self.move_down_tweak_button.clicked.connect(lambda: self.move("down", tweak=True))
self.move_left_button.clicked.connect(lambda: self.move("left", tweak=False))
self.move_left_tweak_button.clicked.connect(lambda: self.move("left", tweak=True))
self.move_right_button.clicked.connect(lambda: self.move("right", tweak=False))
self.move_right_tweak_button.clicked.connect(lambda: self.move("right", tweak=True))
@SafeProperty(int)
def step_size(self) -> int:
return self._step_size
@step_size.setter
def step_size(self, step_size: int):
self._step_size = step_size
@SafeSlot(bool)
def enable_controls_hor(self, enable: bool):
self.move_left_button.setEnabled(enable)
self.move_left_tweak_button.setEnabled(enable)
self.move_right_button.setEnabled(enable)
self.move_right_tweak_button.setEnabled(enable)
@SafeSlot(bool)
def enable_controls_ver(self, enable: bool):
self.move_up_button.setEnabled(enable)
self.move_up_tweak_button.setEnabled(enable)
self.move_down_button.setEnabled(enable)
self.move_down_tweak_button.setEnabled(enable)
def move(self, direction: str, tweak: bool = False):
step = self._step_size
if tweak:
step = int(self._step_size / 5)
if direction == "up":
self.dev.omny_xray_gui.mvy.set(step)
elif direction == "down":
self.dev.omny_xray_gui.mvy.set(-step)
elif direction == "left":
self.dev.omny_xray_gui.mvx.set(-step)
elif direction == "right":
self.dev.omny_xray_gui.mvx.set(step)
else:
logger.warning(f"Unknown direction {direction} for move command.")
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self.get_bec_shortcuts()
self._init_ui()
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self):
self.core_layout = QHBoxLayout(self)
self.image = Image(parent=self)
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
# Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self)
self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
compact_orientation="horizontal")
header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8)
header_row.addWidget(self.roi_manager, 0)
header_row.addStretch()
self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
self.control_panel_layout.addLayout(header_row)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# Step size label
step_size_form = QGridLayout()
# General Step size
self.step_size = QSpinBox(parent=self)
self.step_size.setRange(10, 100)
self.step_size.setSingleStep(10)
self.step_size.setValue(100)
# Submit button
self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel
self.control_panel_layout.addLayout(step_size_form)
# Push form to bottom
self.control_panel_layout.addStretch()
# Sample/Message form (bottom)
form = QGridLayout()
self.sample_name_line_edit = QLineEdit(parent=self)
self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QLineEdit(parent=self)
self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1)
self.control_panel_layout.addLayout(form)
# Fix panel width and allow vertical expansion
self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
# Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel)
def _make_connections(self):
# Fetch initial state
self.on_live_view_enabled(True)
self.step_size.setValue(self.motor_control_2d.step_size)
# Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
self.submit_button.clicked.connect(self.submit)
def _create_separator(self):
sep = QFrame(parent=self)
sep.setFrameShape(QFrame.HLine)
sep.setFrameShadow(QFrame.Sunken)
sep.setLineWidth(1)
return sep
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
################################################################################
# Device Connection logic
################################################################################
def connect_motors(self):
""" Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
logger.info(f"Succesfully connected to {motor}")
################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeProperty(str)
def user_message(self):
return self.message_line_edit.text()
@user_message.setter
def user_message(self, message: str):
self.message_line_edit.setText(message)
@SafeProperty(str)
def sample_name(self):
return self.sample_name_line_edit.text()
@sample_name.setter
def sample_name(self, message: str):
self.sample_name_line_edit.setText(message)
@SafeProperty(bool)
def enable_move_buttons(self):
return self.motor_control_2d.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled: bool):
self.motor_control_2d.setEnabled(enabled)
def active_roi(self) -> BaseROI | None:
"""Return the currently active ROI, or None if no ROI is active."""
return self.roi_manager.single_active_roi
################################################################################
# Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeSlot()
def get_roi_coordinates(self) -> dict | None:
"""Get the coordinates of the currently active ROI."""
roi = self.roi_manager.single_active_roi
if roi is None:
logger.warning("No active ROI")
return None
logger.info(f"Active ROI coordinates: {roi.get_coordinates()}")
return roi.get_coordinates()
@SafeSlot(bool)
def on_live_view_enabled(self, enabled: bool):
logger.info(f"Live view is enabled: {enabled}")
self.live_preview_toggle.blockSignals(True)
if enabled:
self.live_preview_toggle.checked = enabled
self.image.image(CAMERA)
self.live_preview_toggle.blockSignals(False)
return
self.image.disconnect_monitor(CAMERA)
self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool, bool)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(int)
def enable_submit_button(self, enable: int):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
if enable == -1:
self.submit_button.setEnabled(False)
else:
self.submit_button.setEnabled(True)
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@SafeSlot(dict, dict)
def device_updates(self, data: dict, meta: dict):
"""
Slot to handle device updates from omny_xray_gui device.
Args:
data(dict): data from device
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot()
def submit(self):
"""Execute submit action by submit button."""
if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI")
return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates['center_y']
# Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates['width']
roi_height = roi_coordinates['height']
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates['radius']
else:
logger.warning("Unsupported ROI type for submit action.")
return
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
getattr(self.dev,CAMERA[0]).live_mode = False
super().cleanup()
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
win = XRayEye()
win.resize(1000, 800)
win.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@
{'files': ['x_ray_eye.py']}

View File

@@ -0,0 +1,57 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from bec_widgets.utils.bec_designer import designer_material_icon
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye import XRayEye
DOM_XML = """
<ui language='c++'>
<widget class='XRayEye' name='x_ray_eye'>
</widget>
</ui>
"""
class XRayEyePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = XRayEye(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(XRayEye.ICON_NAME)
def includeFile(self):
return "x_ray_eye"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "XRayEye"
def toolTip(self):
return "XRayEye"
def whatsThis(self):
return self.toolTip()

View File

@@ -213,6 +213,8 @@ ftransy:
onFailure: buffer
readOnly: false
readoutPriority: baseline
userParameter:
sensor_voltage: -2.4
ftransz:
description: Sample transer Z
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -333,8 +335,8 @@ rtx:
readOnly: false
readoutPriority: on_request
userParameter:
low_signal: 11000
min_signal: 10000
low_signal: 10000
min_signal: 9000
rt_pid_voltage: -0.06219
rty:
description: flomni rt
@@ -362,3 +364,105 @@ rtz:
onFailure: buffer
readOnly: false
readoutPriority: on_request
############################################################
####################### Cameras ############################
############################################################
cam_flomni_gripper:
description: Camera sample changer
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5000/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_flomni_overview:
description: Camera flomni overview
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5001/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_xeye:
description: Camera flOMNI Xray eye ID1
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 1
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
cam_ids_rgb:
description: Camera flOMNI Xray eye ID203
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 203
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
# ############################################################
# ################### flOMNI temperatures ####################
# ############################################################
flomni_temphum:
description: flOMNI Temperatures and humidity
deviceClass: csaxs_bec.devices.omny.flomni_temp_and_humidity.FlomniTempHum
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
# ############################################################
# ########## OMNY / flOMNI / LamNI fast shutter ##############
# ############################################################
omnyfsh:
description: omnyfsh connects to read fast shutter at X12 if in that network
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastEpicsShutter
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui Epics signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
calculated_signal:
description: Calculated signal from alignment for fit
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline

View File

@@ -1,35 +0,0 @@
#Standard configuration of an Owis LTM80 linear stage
samx:
description: Owis LTM80 linear stage
deviceClass: ophyd_devices.devices.EpicsMotorEx
deviceConfig:
prefix: X12SA-ES2-ES01
motor_resolution: 0.00125
base_velocity: 0.25
velocity: 2.5
backlash_distance: 0.125
acceleration: 0.4
user_offset_dir: 0
direction_of_travel: 1
deviceTags:
- cSAXS
- user_motor
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
#xbpm1x:
# description: 'X-ray BPM1 in frontend translation x'
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: 'X12SA-FE-XBPM1:TRX'
# onFailure: raise
# enabled: true
# readoutPriority: baseline
# readOnly: false
# softwareTrigger: false
# deviceTags:
# - cSAXS
# - frontend

View File

@@ -26,20 +26,6 @@ dmmx:
- cSAXS
- optics
#E_kev:
# description: 'Double Multilayer Monochromator, energy (keV)'
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: 'X12SA-OP-DMM1:ENERGY'
# onFailure: raise
# enabled: true
# readoutPriority: baseline
# readOnly: true
# softwareTrigger: false
# deviceTags:
# - cSAXS
# - optics
dmmy:
description: 'Double Multilayer Monochromator, translation Y'
deviceClass: ophyd.EpicsMotor

View File

@@ -1 +1 @@
from .ids_camera_new import IDSCamera
from .ids_camera import IDSCamera

View File

@@ -18,6 +18,7 @@ import atexit
from typing import Literal
import numpy as np
import time
from bec_lib.logger import bec_logger
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
@@ -66,8 +67,8 @@ class IDSCameraObject:
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
logger.info("Bayer color mode detected.")
# setup the color depth to the current windows setting
@@ -76,16 +77,16 @@ class IDSCameraObject:
) # TODO This raises an error - maybe check the m_n_colormode value
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
self.n_bits_per_pixel = self.ueye.INT(32)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_MONO8
@@ -159,15 +160,17 @@ class Camera:
"""
def __init__(
self,
camera_id: int,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: int = 24,
connect: bool = True,
self,
camera_id: int,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: int = 24,
connect: bool = True,
force_monochrome: bool = False,
):
self.ueye = ueye
self.camera_id = camera_id
self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel}
self.force_monochrome = force_monochrome
self._connected = False
self.cam = None
atexit.register(self.on_disconnect)
@@ -197,14 +200,16 @@ class Camera:
self.cam = IDSCameraObject(self.camera_id, **self._inputs)
self._connected = True
def on_disconnect(self):
"""Disconnect from the camera."""
def on_disconnect(self, delay_after: float = 0.3):
"""Disconnect from the camera and optionally wait a short time for driver cleanup."""
try:
if self.cam and self.cam.h_cam:
check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject")
self._connected = False
self.cam = None
logger.info("Camera disconnected.")
if delay_after > 0:
time.sleep(delay_after)
logger.debug(f"Waited {delay_after:.2f}s after camera disconnect for cleanup.")
except Exception as e:
logger.info(f"Error during camera disconnection: {e}")
@@ -263,9 +268,19 @@ class Camera:
if array is None:
logger.error("Failed to get image data from the camera.")
return None
return np.reshape(
img = np.reshape(
array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel)
)
# If RGB image (H, W, 3), reshuffle channels from BGR → RGB
if img.ndim == 3 and img.shape[2] == 3:
img = img[:, :, ::-1]
if self.force_monochrome:
gray = np.dot(img[..., :3], [0.2989, 0.5870, 0.1140]).astype(np.uint8)
# expand to 3D shape (H, W, 1) for consistency with real mono cams
img = np.expand_dims(gray, axis=-1)
img = np.ascontiguousarray(img)
return img
if __name__ == "__main__":

View File

@@ -1,403 +1,225 @@
"""IDS Camera class for cSAXS IDS cameras."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, Signal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import PreviewSignal
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
class ROISignal(Signal):
"""
Signal to handle the Region of Interest (ROI) for the IDS camera.
It is a tuple of (x, y, width, height).
"""
def __init__(
self,
*,
name,
roi: tuple | None = None,
value=0,
dtype=None,
shape=None,
timestamp=None,
parent=None,
labels=None,
kind=Kind.hinted,
tolerance=None,
rtolerance=None,
metadata=None,
cl=None,
attr_name="",
):
super().__init__(
name=name,
value=value,
dtype=dtype,
shape=shape,
timestamp=timestamp,
parent=parent,
labels=labels,
kind=kind,
tolerance=tolerance,
rtolerance=rtolerance,
metadata=metadata,
cl=cl,
attr_name=attr_name,
)
self.roi = roi
def get(self, **kwargs):
image = self.parent.image_data.get().data
if not isinstance(image, np.ndarray):
return -1 # -1 if no valid image is available
if self.roi is None:
roi = (0, 0, image.shape[1], image.shape[0])
else:
roi = self.roi
if len(image.shape) > 2:
image = np.sum(image, axis=2) # Convert to grayscale if it's a color image
return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1))
class IDSCamera(PSIDeviceBase):
""" "
#---------------------------------------------------------------------------------------------------------------------------------------
"""IDS Camera class for cSAXS.
#Variables
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
sInfo = ueye.SENSORINFO()
cInfo = ueye.CAMINFO()
pcImageMemory = ueye.c_mem_p()
MemID = ueye.int()
rectAOI = ueye.IS_RECT()
pitch = ueye.INT()
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
bytes_per_pixel = int(nBitsPerPixel / 8)
ids_cam
...
This class inherits from PSIDeviceBase and implements the necessary methods
to interact with the IDS camera using the pyueye library.
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"]
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0,
transpose=False)
roi_signal = Cpt(
AsyncSignal,
name="roi_signal",
ndim=0,
max_size=1000,
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted)
# roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal)
# roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal)
# roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal)
# roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal)
# roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal")
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__(
self,
prefix="",
*,
name: str,
camera_ID: int,
bits_per_pixel: int,
channels: int,
m_n_colormode: int,
kind=None,
device_manager=None,
**kwargs,
self,
*,
name: str,
camera_id: int,
prefix: str = "",
scan_info: ScanInfo | None = None,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: Literal[8, 24] = 24,
live_mode: bool = False,
num_rotation_90: int = 0,
transpose: bool = False,
force_monochrome: bool = False,
**kwargs,
):
"""Initialize the IDS Camera.
super().__init__(
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
)
self.camera_ID = camera_ID
self.bits_per_pixel = bits_per_pixel
self.bytes_per_pixel = None
self.channels = channels
self._m_n_colormode_input = m_n_colormode
self.m_n_colormode = None
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.c_info = None
self.pc_image_memory = None
self.mem_id = None
self.rect_aoi = None
self.pitch = None
self.n_bits_per_pixel = None
self.width = None
self.height = None
self.thread_event = threading.Event()
self.data_thread = None
self._roi: tuple | None = None # x, y, width, height
logger.info(
f"Deprecation warning: The IDSCamera class is deprecated. Use the new IDSCameraNew class instead."
Args:
name (str): Name of the device.
camera_id (int): The ID of the camera device.
prefix (str): Prefix for the device.
scan_info (ScanInfo | None): Scan information for the device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._live_mode_thread: threading.Thread | None = None
self._stop_live_mode_event: threading.Event = threading.Event()
self.cam = Camera(
camera_id=camera_id,
m_n_colormode=m_n_colormode,
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose
self._force_monochrome = force_monochrome
def set_roi(self, x: int, y: int, width: int, height: int):
self._roi = (x, y, width, height)
############## Live Mode Methods ##############
def start_backend(self):
if self.ueye is None:
raise ImportError("The pyueye library is not installed.")
self.h_cam = self.ueye.HIDS(
self.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.s_info = self.ueye.SENSORINFO()
self.c_info = self.ueye.CAMINFO()
self.pc_image_memory = self.ueye.c_mem_p()
self.mem_id = self.ueye.int()
self.rect_aoi = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.n_bits_per_pixel = self.ueye.INT(
self.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.m_n_colormode = self.ueye.INT(
self._m_n_colormode_input
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
return self._mask
# Starts the driver and establishes the connection to the camera
ret = self.ueye.is_InitCamera(self.h_cam, None)
if ret != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
@mask.setter
def mask(self, value: np.ndarray):
"""
Set the region of interest (ROI) for the camera.
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
Args:
value (np.ndarray): The mask to set as the ROI.
"""
if value.ndim != 2:
raise ValueError("ROI mask must be a 2D array.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
raise ValueError(
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
)
self._mask = value
# You can query additional information about the sensor type used in the camera
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
ret = self.ueye.is_ResetToDefault(self.h_cam)
if ret != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_n_colormode: \t\t", self.m_n_colormode)
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
n_bits_per_pixel = self.ueye.INT(32)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
ret = self.ueye.is_AOI(
self.h_cam,
self.ueye.IS_AOI_IMAGE_GET_AOI,
self.rect_aoi,
self.ueye.sizeof(self.rect_aoi),
)
if ret != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
self.width = self.rect_aoi.s32Width
self.height = self.rect_aoi.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
# ---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
ret = self.ueye.is_AllocImageMem(
self.h_cam,
self.width,
self.height,
self.n_bits_per_pixel,
self.pc_image_memory,
self.mem_id,
)
if ret != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
if ret != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
# Set the desired color mode
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
self._stop_live()
# Activates the camera's live video mode (free run mode)
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
if ret != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
if x < 0 or y < 0 or width <= 0 or height <= 0:
raise ValueError("ROI coordinates and dimensions must be positive integers.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if x + width > img_shape[1] or y + height > img_shape[0]:
raise ValueError("ROI exceeds camera dimensions.")
mask = np.zeros(img_shape, dtype=np.uint8)
mask[y: y + height, x: x + width] = 1
self.mask = mask
# Enables the queue mode for existing image memory sequences
ret = self.ueye.is_InquireImageMem(
self.h_cam,
self.pc_image_memory,
self.mem_id,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
def _start_live(self):
"""Start the live mode for the camera."""
if self._live_mode_thread is not None:
logger.info("Live mode thread is already running.")
return
self._stop_live_mode_event.clear()
self._live_mode_thread = threading.Thread(
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
)
if ret != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
self._live_mode_thread.start()
def _stop_live(self):
"""Stop the live mode for the camera."""
if self._live_mode_thread is None:
logger.info("Live mode thread is not running.")
return
self._stop_live_mode_event.set()
self._live_mode_thread.join(timeout=5)
if self._live_mode_thread.is_alive():
logger.warning("Live mode thread did not stop gracefully.")
else:
print("Press q to leave the programm")
# startmeasureframerate = True
# Gain = False
self._live_mode_thread = None
logger.info("Live mode stopped.")
# Start live mode of camera immediately
self.start_live_mode()
def _live_mode_loop(self, stop_event: threading.Event):
"""Loop to capture images in live mode."""
while not stop_event.is_set():
try:
self.process_data(self.cam.get_image_data())
except Exception as e:
logger.error(f"Error in live mode loop: {e}")
break
stop_event.wait(0.2) # 5 Hz
def _start_data_thread(self):
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def process_data(self, image: np.ndarray | None):
"""Process the image data before sending it to the preview signal."""
if image is None:
return
self.image.put(image)
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
if self.ueye is None:
print("pyueye library not available.")
def get_last_image(self) -> np.ndarray:
"""Get the last captured image from the camera."""
image = self.image.get()
if image:
return image.data
############## User Interface Methods ##############
def on_connected(self):
"""Connect to the camera."""
self.cam.force_monochrome = self._force_monochrome
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self.cam.on_disconnect()
super().on_destroy()
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
return
image = self.image.get()
if image is not None:
image: messages.DevicePreviewMessage
if self.mask.shape[0:2] != image.data.shape[0:2]:
logger.info(
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
)
return
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = self.ueye.get_data(
self.pc_image_memory,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
copy=False,
)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.image_data.put(frame)
time.sleep(0.1)
def wait_for_connection(self, all_signals=False, timeout=10):
if ueye is None:
raise ImportError(
"The pyueye library is not installed or doesn't provide the necessary c libs"
)
super().wait_for_connection(all_signals, timeout)
def start_live_mode(self):
if self.data_thread is not None:
self.stop_live_mode()
self._start_data_thread()
def stop_live_mode(self):
"""Stopping the camera live mode."""
self.thread_event.set()
if self.data_thread is not None:
self.data_thread.join()
self.thread_event.clear()
self.data_thread = None
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.start_backend()
self.start_live_mode()
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
self.stop_live_mode()
if len(image.data.shape) == 3:
# If the image has multiple channels, apply the mask to each channel
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
n_channels = 3
else:
data = image.data * self.mask
n_channels = 1
self.roi_signal.put(np.sum(data) / (np.sum(self.mask) * n_channels))
if __name__ == "__main__":
# Example usage
camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1)
camera.wait_for_connection()
camera.on_destroy()
# Example usage of the IDSCamera class
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")

View File

@@ -1,226 +0,0 @@
"""IDS Camera class for cSAXS IDS cameras."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
class IDSCamera(PSIDeviceBase):
"""IDS Camera class for cSAXS.
This class inherits from PSIDeviceBase and implements the necessary methods
to interact with the IDS camera using the pyueye library.
"""
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.")
roi_signal = Cpt(
AsyncSignal,
name="roi_signal",
ndim=0,
max_size=1000,
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__(
self,
*,
name: str,
camera_id: int,
prefix: str = "",
scan_info: ScanInfo | None = None,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: Literal[8, 24] = 24,
live_mode: bool = False,
**kwargs,
):
"""Initialize the IDS Camera.
Args:
name (str): Name of the device.
camera_id (int): The ID of the camera device.
prefix (str): Prefix for the device.
scan_info (ScanInfo | None): Scan information for the device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._live_mode_thread: threading.Thread | None = None
self._stop_live_mode_event: threading.Event = threading.Event()
self.cam = Camera(
camera_id=camera_id,
m_n_colormode=m_n_colormode,
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
############## Live Mode Methods ##############
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
return self._mask
@mask.setter
def mask(self, value: np.ndarray):
"""
Set the region of interest (ROI) for the camera.
Args:
value (np.ndarray): The mask to set as the ROI.
"""
if value.ndim != 2:
raise ValueError("ROI mask must be a 2D array.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
raise ValueError(
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
if x < 0 or y < 0 or width <= 0 or height <= 0:
raise ValueError("ROI coordinates and dimensions must be positive integers.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if x + width > img_shape[1] or y + height > img_shape[0]:
raise ValueError("ROI exceeds camera dimensions.")
mask = np.zeros(img_shape, dtype=np.uint8)
mask[y : y + height, x : x + width] = 1
self.mask = mask
def _start_live(self):
"""Start the live mode for the camera."""
if self._live_mode_thread is not None:
logger.info("Live mode thread is already running.")
return
self._stop_live_mode_event.clear()
self._live_mode_thread = threading.Thread(
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
)
self._live_mode_thread.start()
def _stop_live(self):
"""Stop the live mode for the camera."""
if self._live_mode_thread is None:
logger.info("Live mode thread is not running.")
return
self._stop_live_mode_event.set()
self._live_mode_thread.join(timeout=5)
if self._live_mode_thread.is_alive():
logger.warning("Live mode thread did not stop gracefully.")
else:
self._live_mode_thread = None
logger.info("Live mode stopped.")
def _live_mode_loop(self, stop_event: threading.Event):
"""Loop to capture images in live mode."""
while not stop_event.is_set():
try:
self.process_data(self.cam.get_image_data())
except Exception as e:
logger.error(f"Error in live mode loop: {e}")
break
stop_event.wait(0.2) # 5 Hz
def process_data(self, image: np.ndarray | None):
"""Process the image data before sending it to the preview signal."""
if image is None:
return
self.image.put(image)
def get_last_image(self) -> np.ndarray:
"""Get the last captured image from the camera."""
image = self.image.get()
if image:
return image.data
############## User Interface Methods ##############
def on_connected(self):
"""Connect to the camera."""
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self.cam.on_disconnect()
super().on_destroy()
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
return
image = self.image.get()
if image is not None:
image: messages.DevicePreviewMessage
if self.mask.shape[0:2] != image.data.shape[0:2]:
logger.info(
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
)
return
if len(image.data.shape) == 3:
# If the image has multiple channels, apply the mask to each channel
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
n_channels = 3
else:
data = image.data * self.mask
n_channels = 1
self.roi_signal.put(
{
self.roi_signal.name: {
"value": np.sum(data)
/ (np.sum(self.mask) * n_channels), # TODO could be optimized
"timestamp": time.time(),
}
}
)
if __name__ == "__main__":
# Example usage of the IDSCamera class
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")

View File

@@ -24,24 +24,25 @@ class FlomniSampleStorage(Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {}) for i in range(21)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {"auto_monitor": True}) for i in range(21)
}
sample_placed = Dcpt(sample_placed)
sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True, "auto_monitor": True})
for i in range(21)
}
sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET"
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET", auto_monitor=True
)
sample_in_gripper_name = Cpt(
EpicsSignal,
name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC",
string=True,
auto_monitor=True
)
def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -0,0 +1,208 @@
import time
import datetime
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import FRAME, PrettyTable
import numpy as np
class FlomniTempHumError(Exception):
pass
class FlomniTempHum(Device):
USER_ACCESS = [
"show_all",
"help",
]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
temperature_mirror = Cpt(
EpicsSignal, name="temperature_mirror", read_pv="XOMNI-TEMPHUM-MIRROR:0.VAL"
)
temperature_mirrorset_set = Cpt(
EpicsSignal, name="temperature_mirrorset_set", read_pv="XOMNI-TEMPHUM-MIRRORSET_SET:0.VAL"
)
temperature_mirrorset_rb = Cpt(
EpicsSignal, name="temperature_mirrorset_rb", read_pv="XOMNI-TEMPHUM-MIRRORSET_RB:0.VAL"
)
temperature_osa = Cpt(
EpicsSignal, name="temperature_osa", read_pv="XOMNI-TEMPHUM-OSA:0.VAL"
)
temperature_osaset_set = Cpt(
EpicsSignal, name="temperature_osaset_set", read_pv="XOMNI-TEMPHUM-OSASET_SET:0.VAL"
)
temperature_osaset_rb = Cpt(
EpicsSignal, name="temperature_osaset_rb", read_pv="XOMNI-TEMPHUM-OSASET_RB:0.VAL"
)
omegactrl_alive = Cpt(
EpicsSignal, name="omegactrl_alive", read_pv="XOMNI-TEMPHUM-OMEGACTRL-ALIVE:0.VAL"
)
galilctrl_alive = Cpt(
EpicsSignal, name="galilctrl_alive", read_pv="XOMNI-TEMPHUM-GALILCTRL-ALIVE:0.VAL"
)
temperature_heater = Cpt(
EpicsSignal, name="temperature_heater", read_pv="XOMNI-TEMPHUM-HEATER:0.VAL"
)
temperature_heaterset_set = Cpt(
EpicsSignal, name="temperature_heaterset_set", read_pv="XOMNI-TEMPHUM-HEATERSET_SET:0.VAL"
)
temperature_heaterset_rb = Cpt(
EpicsSignal, name="temperature_heaterset_rb", read_pv="XOMNI-TEMPHUM-HEATERSET_RB:0.VAL"
)
temperature_heaterhousing = Cpt(
EpicsSignal, name="temperature_heaterhousing", read_pv="XOMNI-TEMPHUM-HEATERHOUSE:0.VAL"
)
temperature_heaterhousing_alarm = Cpt(
EpicsSignal, name="temperature_heaterhousing_alarm", read_pv="XOMNI-TEMPHUM-HEATERHOUSEALARM:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
###### GALIL CONTROLLER
humidity_sensor1 = Cpt(
EpicsSignal, name="humidity_sensor1", read_pv="XOMNI-TEMPHUM-HUM1:0.VAL"
)
humidity_sensor2 = Cpt(
EpicsSignal, name="humidity_sensor2", read_pv="XOMNI-TEMPHUM-HUM2:0.VAL"
)
humidity_sensor1_temperature = Cpt(
EpicsSignal, name="humidity_sensor1_temperature", read_pv="XOMNI-TEMPHUM-TEMP1:0.VAL"
)
humidity_sensor2_temperature = Cpt(
EpicsSignal, name="humidity_sensor2_temperature", read_pv="XOMNI-TEMPHUM-TEMP2:0.VAL"
)
humidity_sensor1_err = Cpt(
EpicsSignal, name="humidity_sensor1_err", read_pv="XOMNI-TEMPHUM-ERR1:0.VAL"
)
humidity_sensor2_err = Cpt(
EpicsSignal, name="humidity_sensor2_err", read_pv="XOMNI-TEMPHUM-ERR2:0.VAL"
)
flow = Cpt(
EpicsSignal, name="flow", read_pv="XOMNI-TEMPHUM-FLOW:0.VAL"
)
flowset = Cpt(
EpicsSignal, name="flowset", read_pv="XOMNI-TEMPHUM-FLOWSET:0.VAL"
)
flowset_set = Cpt(
EpicsSignal, name="flowset_set", read_pv="XOMNI-TEMPHUM-FLOWSETSET:0.VAL"
)
humidityset = Cpt(
EpicsSignal, name="humidityset", read_pv="XOMNI-TEMPHUM-HUMSET:0.VAL"
)
humidityset_set = Cpt(
EpicsSignal, name="humidityset_set", read_pv="XOMNI-TEMPHUM-HUMSETSET:0.VAL"
)
suction = Cpt(
EpicsSignal, name="suction", read_pv="XOMNI-TEMPHUM-SUCTION:0.VAL"
)
valvedry = Cpt(
EpicsSignal, name="valvedry", read_pv="XOMNI-TEMPHUM-VALVEDRY:0.VAL"
)
valvewet = Cpt(
EpicsSignal, name="valvewet", read_pv="XOMNI-TEMPHUM-VALVEWET:0.VAL"
)
setuptemp = Cpt(
EpicsSignal, name="setuptemp", read_pv="XOMNI-TEMPHUM-SETUPTEMP:0.VAL"
)
def omega_controller_running(self):
time_diff = np.fabs(float(self.omegactrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def galil_controller_running(self):
time_diff = np.fabs(float(self.galilctrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.temperature_mirror.subscribe(self._emit_value, run=False)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
def show_all(self):
print("=== flOMNI Temperature & Humidity Overview ===")
print("")
print("Temperatures:")
print(f" Mirror: {float(self.temperature_mirror.get()):7.2f} °C")
print(f" Mirror Setpoint (RB): {float(self.temperature_mirrorset_rb.get()):7.2f} °C")
print(f" OSA: {float(self.temperature_osa.get()):7.2f} °C")
print(f" OSA Setpoint (RB): {float(self.temperature_osaset_rb.get()):7.2f} °C")
print(f" Heater: {float(self.temperature_heater.get()):7.2f} °C")
print(f" Heater Setpoint (RB): {float(self.temperature_heaterset_rb.get()):7.2f} °C")
print(f" Heater Enabled: {float(self.temperature_heater_enabled.get()):.0f}")
print(f" Heater Housing: {float(self.temperature_heaterhousing.get()):7.2f} °C")
print(f" Heater Housing Alarm: {float(self.temperature_heaterhousing_alarm.get()):.0f}")
print("")
print("Humidity Sensors:")
print(f" Sensor 1 Humidity: {float(self.humidity_sensor1.get()):7.2f} %RH")
print(f" Sensor 1 Temperature: {float(self.humidity_sensor1_temperature.get()):7.2f} °C")
print(f" Sensor 1 Error: {float(self.humidity_sensor1_err.get()):.0f}")
print(f" Sensor 2 Humidity: {float(self.humidity_sensor2.get()):7.2f} %RH")
print(f" Sensor 2 Temperature: {float(self.humidity_sensor2_temperature.get()):7.2f} °C")
print(f" Sensor 2 Error: {float(self.humidity_sensor2_err.get()):.0f}")
print(f" Humidity Setpoint: {float(self.humidityset.get()):7.2f} %RH")
print("")
print("Flow Control:")
print(f" Flow: {float(self.flow.get()):7.2f} sccm")
print(f" Flow Setpoint (RB): {float(self.flowset.get()):7.2f} sccm")
print("")
print("Suction:")
print(f" Suction: {float(self.suction.get()):7.2f}")
print("")
print("Valves:")
print(f" Dry Valve: {float(self.valvedry.get()):.0f}")
print(f" Wet Valve: {float(self.valvewet.get()):.0f}")
print("")
print("Controller Heartbeats:")
print(f" OMEGA Controller Alive: {self.omega_controller_running()}")
print(f" GALIL Controller Alive: {self.galil_controller_running()}")
print("==============================================")
def help(self):
print("Help for flOMNI temperature and humidity control system:")
print("Available methods:")
print(" show_all() - display all current values")

View File

@@ -37,84 +37,86 @@ class OMNYSampleStorage(Device):
_default_sub = SUB_VALUE
sample_shuttle_A_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_A_placed = Dcpt(sample_shuttle_A_placed)
sample_shuttle_B_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_B_placed = Dcpt(sample_shuttle_B_placed)
sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
parking_placed = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {}) for i in range(1, 7)
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {"auto_monitor": True}) for i in range(1, 7)
}
parking_placed = Dcpt(parking_placed)
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {"auto_monitor": True})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_placed = Dcpt(sample_placed)
sample_shuttle_A_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
sample_shuttle_A_names = Dcpt(sample_shuttle_A_names)
sample_shuttle_B_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
sample_shuttle_B_names = Dcpt(sample_shuttle_B_names)
sample_shuttle_C_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
sample_shuttle_C_names = Dcpt(sample_shuttle_C_names)
parking_names = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True})
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True, "auto_monitor": True})
for i in range(1, 7)
}
parking_names = Dcpt(parking_names)
sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True, "auto_monitor": True})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL"
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL", auto_monitor=True
)
sample_in_gripper_name = Cpt(
EpicsSignal,
name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_omny:110.DESC",
string=True,
auto_monitor=True
)
sample_in_samplestage = Cpt(
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL"
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL", auto_monitor=True
)
sample_in_samplestage_name = Cpt(
EpicsSignal,
name="sample_in_samplestage_name",
read_pv="XOMNY-SAMPLE_DB_omny:0.DESC",
string=True,
auto_monitor=True
)
def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -392,6 +392,9 @@ class RtFlomniController(Controller):
rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal:
time.sleep(1)
if signal < min_signal:

View File

@@ -0,0 +1,82 @@
import time
import socket
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import EpicsSignal
class OMNYFastEpicsShutterError(Exception):
pass
def _detect_host_pv():
"""Detect host subnet and return appropriate PV name."""
try:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if local_ip.startswith("129.129.122."):
return "X12SA-ES1-TTL:OUT_01"
else:
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
except Exception as ex:
print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.")
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
class OMNYFastEpicsShutter(Device):
"""
Fast EPICS shutter with automatic PV selection based on host subnet.
"""
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
# PV is detected dynamically at import time
shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True)
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.shutter.subscribe(self._emit_value)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self.wait_for_connection()
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
# -----------------------------------------------------
# User-facing shutter control functions
# -----------------------------------------------------
def fshopen(self):
"""Open the fast shutter."""
try:
self.shutter.put(1, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
def fshclose(self):
"""Close the fast shutter."""
try:
self.shutter.put(0, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
def fshstatus(self):
"""Return the fast shutter status (0=closed, 1=open)."""
try:
return self.shutter.get()
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
def fshinfo(self):
"""Print information about which EPICS PV channel is being used."""
pvname = self.shutter.pvname
print(f"Fast shutter connected to EPICS channel: {pvname}")
return pvname
def help(self):
"""Display available user methods."""
print("Available methods:")
for method in self.USER_ACCESS:
print(f" - {method}")

View File

@@ -0,0 +1,65 @@
import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt
from ophyd_devices import PreviewSignal
import traceback
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs)
self.url = url
self._connection = None
self._update_thread = None
self._buffer = b""
self._shutdown_event = threading.Event()
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
def start_live_mode(self) -> None:
if self._connection is not None:
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def _update_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
self._connection = requests.get(self.url, stream=True)
for chunk in self._connection.iter_content(chunk_size=1024):
self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end
if start == -1 or end == -1:
continue
jpg = self._buffer[start:end+2]
self._buffer = self._buffer[end+2:]
image = cv2.imdecode(np.frombuffer(jpg, np.uint8), cv2.IMREAD_COLOR)
if image is not None:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
self.preview.put(image)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -0,0 +1,74 @@
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayEpicsGUI(Device):
save_frame = Cpt(
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
)
update_frame_acqdone = Cpt(
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
)
step = Cpt(
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
)
xval_x_dynamic = {
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)

View File

@@ -33,11 +33,11 @@ logger = bec_logger.logger
class FlomniFermatScan(SyncFlyScanBase):
scan_name = "flomni_fermat_scan"
scan_report_hint = "table"
scan_type = "fly"
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
use_scan_progress_report = True
def __init__(
self,
@@ -74,6 +74,7 @@ class FlomniFermatScan(SyncFlyScanBase):
"""
super().__init__(parameter=parameter, **kwargs)
self.show_live_table = False
self.axis = []
self.fovx = fovx
self.fovy = fovy
@@ -168,6 +169,7 @@ class FlomniFermatScan(SyncFlyScanBase):
tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_signalstrength"
)
#self.device_manager.connector.send_client_info(tracker_signal_status)
if tracker_signal_status == "low":
self.device_manager.connector.raise_alarm(
severity=0,

View File

@@ -88,6 +88,14 @@ Depending on the tomo mode following parameters can be given to the `flomni.tomo
| Golden ratio tomography (sorted in bunches) | projection_number=None |
| Equally spaced with golden starting angle | projection_number=None |
### GUI tools
During operation the BEC GUI will show the relevant cameras or progress information. To manually switch view TAB completion on 'flomni.flomnigui_' will show all options to control the GUI. Most useful
'flomni.flomnigui_show_cameras()' will show the cameras for sample transfer and interior overview
'flomni.flomnigui_show_progress()' will show the measurement progress GUI
'flomnigui_show_xeyealign()' will show the XrayEye alignment GUI
## How to setup flOMNI (software)
This part of the manual is intended for beamline staff and expert users
@@ -218,9 +226,15 @@ Update the values by, example for feyex and in position,
To refresh the frame of the xray eye windows software
`flomni.xrayeye_update_frame()`
This command can also be called to keep the shutter open and live view active
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
To start the xray eye alignment (and clear any previous alignment)
`flomni.xrayeye_alignment_start()`
This command can also be called to keep the shutter open and live view active. Warning: The dose to the sample will be significantly higher.
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
To load the fit parameters from directory _dir_path_ computed by _SPEC_ptycho_align.m_ in Matlab run
`flomni.read_alignment_offset(dir_path='')`
The loading routine uses default values for the vertical alignment. This behavior can be changed (e.g. for getting new default values) by the parameter `use_vertical_default_values=False`.

View File

@@ -16,7 +16,7 @@ dependencies = [
"bec_ipython_client",
"bec_lib",
"bec_server",
"ophyd_devices",
"ophyd_devices~=1.29",
"std_daq_client",
"jfjoch-client",
"rich",
@@ -24,6 +24,7 @@ dependencies = [
"pyueye", # for the IDS uEye camera
"bec_widgets",
"zmq",
"opencv-python",
]
[project.optional-dependencies]

View File

@@ -5,7 +5,7 @@ from unittest import mock
import numpy as np
import pytest
from csaxs_bec.devices.ids_cameras.ids_camera_new import IDSCamera
from csaxs_bec.devices.ids_cameras.ids_camera import IDSCamera
@pytest.fixture(scope="function")