Compare commits

..

10 Commits

Author SHA1 Message Date
x12sa
ae0ad918a3 feat: check all devices are enabled, if not try to enable
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m55s
2026-03-10 14:50:25 +01:00
x01dc
2c28bba8c6 added account check at startup
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m52s
2026-03-09 13:19:58 +01:00
x01dc
a36e96aaba renamings and no bl check in x01 network
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m52s
2026-03-09 13:10:38 +01:00
x01dc
11896a3900 moved bl checker, tomo_id, tomo_reconstruct to separate, sharable classes
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m53s
2026-03-08 19:50:22 +01:00
x01dc
5f4db23014 added light command to lamni
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m51s
2026-03-06 14:51:51 +01:00
x01dc
8ee588d138 guitools
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m50s
2026-03-06 14:36:12 +01:00
x01dc
241cc68d39 WIP guitools initial version 2026-03-06 14:35:54 +01:00
x01dc
c1559e4c0b fixed ascii logo flomni and omny
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m54s
2026-03-06 14:35:04 +01:00
x01dc
8013a6305d added beck startup
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m53s
CI for csaxs_bec / test (pull_request) Failing after 40s
2026-03-06 14:32:47 +01:00
x01dc
393b91c2ac now reading encoder values for axes with encoder
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m57s
CI for csaxs_bec / test (pull_request) Failing after 1m56s
2026-03-06 14:31:23 +01:00
28 changed files with 1538 additions and 938 deletions

View File

@@ -0,0 +1,188 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class LamniGuiToolsError(Exception):
pass
class LamniGuiTools:
def __init__(self):
self.text_box = None
self.progressbar = None
def set_client(self, client):
self.client = client
self.gui = self.client.gui
def lamnigui_show_gui(self):
if "lamni" in self.gui.windows:
self.gui.lamni.show()
else:
self.gui.new("lamni")
def lamnigui_stop_gui(self):
self.gui.lamni.hide()
def lamnigui_raise(self):
self.gui.lamni.raise_window()
def lamnigui_show_xeyealign(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("xeyegui"):
self.lamnigui_remove_all_docks()
self.xeyegui = self.gui.lamni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
def _lamnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"lamni"):
if hasattr(self.gui.lamni,attribute_name):
return False
return True
def lamnigui_remove_all_docks(self):
self.gui.lamni.delete_all()
self.progressbar = None
self.text_box = None
def lamnigui_idle(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("idle_text_box"):
self.lamnigui_remove_all_docks()
idle_text_box = self.gui.lamni.new("idle_textbox").new("TextBox")
text = (
"<pre>"
+ "██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗\n"
+ "██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║\n"
+ "██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║\n"
+ "██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║\n"
+ "██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║\n"
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
def lamnigui_docs(self, filename: str | None = None):
import csaxs_bec
from pathlib import Path
print("The general lamni documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/lamni.html#user-ptychography-lamni")
csaxs_bec_basepath = Path(csaxs_bec.__file__).parent
docs_folder = (
csaxs_bec_basepath /
"bec_ipython_client" / "plugins" / "lamni" / "docs"
)
if not docs_folder.is_dir():
raise NotADirectoryError(f"Docs folder not found: {docs_folder}")
pdfs = sorted(docs_folder.glob("*.pdf"))
if not pdfs:
raise FileNotFoundError(f"No PDF files found in {docs_folder}")
# --- Resolve PDF ------------------------------------------------------
if filename is not None:
pdf_file = docs_folder / filename
if not pdf_file.exists():
raise FileNotFoundError(f"Requested file not found: {filename}")
else:
print("\nAvailable lamni documentation PDFs:\n")
for i, pdf in enumerate(pdfs, start=1):
print(f" {i:2d}) {pdf.name}")
print()
while True:
try:
choice = int(input(f"Select a file (1{len(pdfs)}): "))
if 1 <= choice <= len(pdfs):
pdf_file = pdfs[choice - 1]
break
print(f"Enter a number between 1 and {len(pdfs)}.")
except ValueError:
print("Invalid input. Please enter a number.")
# --- GUI handling (active existence check) ----------------------------
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("PdfViewerWidget"):
self.lamnigui_remove_all_docks()
self.pdf_viewer = self.gui.lamni.new(widget="PdfViewerWidget")
# --- Load PDF ---------------------------------------------------------
self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n")
def _lamnicam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def lamnigui_show_progress(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("progressbar"):
self.lamnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.lamni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.lamni.new(name="progress_text").new("TextBox")
self._lamnigui_update_progress()
def _lamnigui_update_progress(self):
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
self.progress["subtomo_projection"]
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
if __name__ == "__main__":
from bec_lib.client import BECClient
from bec_widgets.cli.client_utils import BECGuiClient
client = BECClient()
client.start()
client.gui = BECGuiClient()
lamni_gui = LamniGuiTools(client)
lamni_gui.lamnigui_show_gui()
lamni_gui.lamnigui_show_progress()

View File

@@ -2,7 +2,6 @@ import builtins
import datetime import datetime
import os import os
import subprocess import subprocess
import threading
import time import time
from pathlib import Path from pathlib import Path
@@ -12,7 +11,13 @@ from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
BeamlineChecker,
OMNYTools,
PtychoReconstructor,
TomoIDManager,
)
from csaxs_bec.bec_ipython_client.plugins.LamNI.gui_tools import LamniGuiTools
from .alignment import XrayEyeAlign from .alignment import XrayEyeAlign
from .lamni_optics_mixin import LaMNIInitStages, LamNIOpticsMixin from .lamni_optics_mixin import LaMNIInitStages, LamNIOpticsMixin
@@ -26,26 +31,26 @@ if builtins.__dict__.get("bec") is not None:
umvr = builtins.__dict__.get("umvr") umvr = builtins.__dict__.get("umvr")
class LamNI(LamNIOpticsMixin): class LamNI(LamNIOpticsMixin, LamniGuiTools):
def __init__(self, client): def __init__(self, client):
super().__init__() super().__init__()
self.client = client self.client = client
self.device_manager = client.device_manager self.device_manager = client.device_manager
self.align = XrayEyeAlign(client, self) self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStages(client) self.init = LaMNIInitStages(client)
self.check_shutter = True
self.check_light_available = True # Extracted collaborators
self.check_fofb = True self.bl_chk = BeamlineChecker(client)
self._check_msgs = [] self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
self.OMNYTools = OMNYTools(self.client)
self.tomo_id = -1 self.tomo_id = -1
self.special_angles = [] self.special_angles = []
self.special_angle_repeats = 20 self.special_angle_repeats = 20
self.special_angle_tolerance = 20 self.special_angle_tolerance = 20
self._current_special_angles = [] self._current_special_angles = []
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.OMNYTools = OMNYTools(self.client)
# Progress tracking # Progress tracking
self.progress = {} self.progress = {}
self.progress["tomo_type"] = "Equally spaced sub-tomograms" self.progress["tomo_type"] = "Equally spaced sub-tomograms"
@@ -57,104 +62,19 @@ class LamNI(LamNIOpticsMixin):
self.progress["angle"] = 0 self.progress["angle"] = 0
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Beamline checks # Beamline checks — delegated to BeamlineChecker
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def get_beamline_checks_enabled(self):
print(
f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:"
f" {self.check_light_available}"
)
@property @property
def beamline_checks_enabled(self): def beamline_checks_enabled(self):
return { return self.bl_chk.checks_enabled
"shutter": self.check_shutter,
"fofb": self.check_fofb,
"light available": self.check_light_available,
}
@beamline_checks_enabled.setter @beamline_checks_enabled.setter
def beamline_checks_enabled(self, val: bool): def beamline_checks_enabled(self, val: bool):
self.check_shutter = val self.bl_chk.checks_enabled = val
self.check_light_available = val
self.check_fofb = val
self.get_beamline_checks_enabled()
def _run_beamline_checks(self): def get_beamline_checks_enabled(self):
msgs = [] self.bl_chk.print_status()
dev = builtins.__dict__.get("dev")
try:
if self.check_shutter:
shutter_val = dev.x12sa_es1_shutter_status.read(cached=True)
if shutter_val["value"].lower() != "open":
self._beam_is_okay = False
msgs.append("Check beam failed: Shutter is closed.")
if self.check_light_available:
machine_status = dev.sls_machine_status.read(cached=True)
if machine_status["value"] not in ["Light Available", "Light-Available"]:
self._beam_is_okay = False
msgs.append("Check beam failed: Light not available.")
if self.check_fofb:
fast_orbit_feedback = dev.sls_fast_orbit_feedback.read(cached=True)
if fast_orbit_feedback["value"] != "running":
self._beam_is_okay = False
msgs.append("Check beam failed: Fast orbit feedback is not running.")
except Exception:
logger.warning("Failed to check beam.")
return msgs
def _check_beam(self):
while not self._stop_beam_check_event.is_set():
self._check_msgs = self._run_beamline_checks()
if not self._beam_is_okay:
self._stop_beam_check_event.set()
time.sleep(1)
def _start_beam_check(self):
self._beam_is_okay = True
self._stop_beam_check_event = threading.Event()
self.beam_check_thread = threading.Thread(target=self._check_beam, daemon=True)
self.beam_check_thread.start()
def _was_beam_okay(self):
self._stop_beam_check_event.set()
self.beam_check_thread.join()
return self._beam_is_okay
def _print_beamline_checks(self):
for msg in self._check_msgs:
logger.warning(msg)
def _wait_for_beamline_checks(self):
self._print_beamline_checks()
try:
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Beamline checks failed at"
f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)}</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
while True:
self._beam_is_okay = True
self._check_msgs = self._run_beamline_checks()
if self._beam_is_okay:
break
self._print_beamline_checks()
time.sleep(1)
try:
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Operation resumed at"
f" {str(datetime.datetime.now())}.</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Special angles # Special angles
@@ -212,7 +132,6 @@ class LamNI(LamNIOpticsMixin):
def feedback_status(self): def feedback_status(self):
self.device_manager.devices.rtx.controller.show_feedback_status() self.device_manager.devices.rtx.controller.show_feedback_status()
def show_interferometer_positions(self): def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_feedback_status() self.device_manager.devices.rtx.controller.show_feedback_status()
@@ -222,6 +141,12 @@ class LamNI(LamNIOpticsMixin):
def show_analog_signals(self): def show_analog_signals(self):
return self.device_manager.devices.rtx.controller.show_analog_signals() return self.device_manager.devices.rtx.controller.show_analog_signals()
def lights_off(self):
self.device_manager.devices.lsamx.controller.lights_off()
def lights_on(self):
self.device_manager.devices.lsamx.controller.lights_on()
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Global parameters (backed by BEC global vars) # Global parameters (backed by BEC global vars)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -371,6 +296,7 @@ class LamNI(LamNIOpticsMixin):
@ptycho_reconstruct_foldername.setter @ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str): def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val) self.client.set_global_var("ptycho_reconstruct_foldername", val)
self.reconstructor.folder_name = val # keep reconstructor in sync
@property @property
def tomo_angle_stepsize(self): def tomo_angle_stepsize(self):
@@ -491,23 +417,22 @@ class LamNI(LamNIOpticsMixin):
) )
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Sample database # Sample database — delegated to TomoIDManager in omny general tools
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def add_sample_database( def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
): ):
"""Add a sample to the OMNY sample database and retrieve the tomo id.""" """Add a sample to the OMNY sample database and retrieve the tomo id."""
subprocess.run( return self.tomo_id_manager.register(
"wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt" sample_name=samplename,
f" 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}" date=date,
f"&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}" eaccount=eaccount,
f"&additional={sample_additional_info}&user={user}'", scan_number=scan_number,
shell=True, setup=setup,
additional_info=sample_additional_info,
user=user,
) )
with open("/tmp/currsamplesnr.txt") as tomo_number_file:
tomo_number = int(tomo_number_file.read())
return tomo_number
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Scan projection # Scan projection
@@ -524,7 +449,6 @@ class LamNI(LamNIOpticsMixin):
for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1): for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1):
for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1): for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1):
# pylint: disable=undefined-variable
self._current_scan_list.append(bec.queue.next_scan_number) self._current_scan_list.append(bec.queue.next_scan_number)
log_message = ( log_message = (
f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle}," f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle},"
@@ -562,18 +486,11 @@ class LamNI(LamNIOpticsMixin):
def tomo_reconstruct(self, base_path="~/Data10/specES1"): def tomo_reconstruct(self, base_path="~/Data10/specES1"):
"""Write the tomo reconstruct file for the reconstruction queue.""" """Write the tomo reconstruct file for the reconstruction queue."""
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
base_path = os.path.expanduser(base_path) self.reconstructor.write(
ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername)) scan_list=self._current_scan_list,
ptycho_queue_path.mkdir(parents=True, exist_ok=True) next_scan_number=bec.queue.next_scan_number,
base_path=base_path,
last_scan_number = bec.queue.next_scan_number - 1
ptycho_queue_file = os.path.abspath(
os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat")
) )
with open(ptycho_queue_file, "w") as queue_file:
scans = " ".join([str(scan) for scan in self._current_scan_list])
queue_file.write(f"p.scan_number {scans}\n")
queue_file.write("p.check_nextscan_started 1\n")
def _at_each_angle(self, angle: float) -> None: def _at_each_angle(self, angle: float) -> None:
self.tomo_scan_projection(angle) self.tomo_scan_projection(angle)
@@ -635,7 +552,7 @@ class LamNI(LamNIOpticsMixin):
print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}") print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress() self._print_progress()
while not successful: while not successful:
self._start_beam_check() self.bl_chk.start()
if not self.special_angles: if not self.special_angles:
self._current_special_angles = [] self._current_special_angles = []
if self._current_special_angles: if self._current_special_angles:
@@ -662,10 +579,10 @@ class LamNI(LamNIOpticsMixin):
for scan_nr in range(start_scan_number, end_scan_number): for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number) self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
if self._was_beam_okay() and not error_caught: if self.bl_chk.stop() and not error_caught:
successful = True successful = True
else: else:
self._wait_for_beamline_checks() self.bl_chk.wait_until_recovered()
def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False): def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False):
"""Return the ii-th golden ratio angle within sorted bunches and its subtomo number.""" """Return the ii-th golden ratio angle within sorted bunches and its subtomo number."""
@@ -1010,4 +927,3 @@ class LamNI(LamNIOpticsMixin):
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name] ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
) )
self.client.logbook.send_logbook_message(msg) self.client.logbook.send_logbook_message(msg)

View File

@@ -51,7 +51,7 @@ class LaMNIInitStages:
self.drive_axis_to_limit(dev.lsamrot, "forward") self.drive_axis_to_limit(dev.lsamrot, "forward")
dev.lsamrot.enabled = False dev.lsamrot.enabled = False
print("Now hard reboot the controller and run the initialization routine again.") print("Now hard reboot the controller and run the initialization routine again.")
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True") print("Remark: The controller will be disabled in bec. It will be enabled by running the init route, \nbut in case needed, to enable manually set dev.lsamrot.enabled=True")
return return
if self.OMNYTools.yesno( if self.OMNYTools.yesno(

View File

@@ -37,6 +37,92 @@ class cSAXSInitSmaractStages:
# ------------------------------ # ------------------------------
# Internal helpers (runtime-based) # Internal helpers (runtime-based)
# ------------------------------ # ------------------------------
def _ensure_all_session_devices_enabled(self, selection: set | None = None, try_enable: bool = True):
"""
Ensure all session devices (or a selection) that define 'bl_smar_stage' are enabled.
Parameters
----------
selection : set | None
If provided, only devices in this set are considered.
try_enable : bool
If True, attempt to set device.enabled = True for devices that expose 'enabled' and are False.
If False, only report status without changing it.
Returns
-------
dict
{
"enabled_now": [device_names enabled by this call],
"already_enabled": [device_names already enabled or without 'enabled' attr],
"failed": [device_names that could not be enabled],
"inaccessible": [device_names not accessible]
}
"""
enabled_now = []
already_enabled = []
failed = []
inaccessible = []
# Build axis map to restrict to SmarAct-based devices (same logic as other helpers)
axis_map = self._build_session_axis_map(selection=selection)
for dev_name in sorted(axis_map.keys()):
try:
d = self._get_device_object(dev_name)
if d is None:
inaccessible.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} not accessible.")
continue
# If device has no 'enabled' attribute, treat as already enabled/usable
if not hasattr(d, "enabled"):
already_enabled.append(dev_name)
continue
# If already enabled
try:
if getattr(d, "enabled"):
already_enabled.append(dev_name)
continue
except Exception:
# If reading enabled fails, treat as inaccessible for safety
failed.append(dev_name)
logger.warning(f"[cSAXS] Could not read 'enabled' for {dev_name}.")
continue
# Device exists and is disabled
if try_enable:
try:
logger.info(f"[cSAXS] Enabling device {dev_name} (was disabled).")
setattr(d, "enabled", True)
# small delay to let device initialize if needed
time.sleep(0.05)
if getattr(d, "enabled"):
enabled_now.append(dev_name)
logger.info(f"[cSAXS] Device {dev_name} enabled.")
else:
failed.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} still disabled after enabling attempt.")
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] Failed to enable {dev_name}: {exc}")
else:
# Not trying to enable, just report
failed.append(dev_name)
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] _ensure_all_session_devices_enabled error for {dev_name}: {exc}")
return {
"enabled_now": enabled_now,
"already_enabled": already_enabled,
"failed": failed,
"inaccessible": inaccessible,
}
def _yesno(self, question: str, default: str = "y") -> bool: def _yesno(self, question: str, default: str = "y") -> bool:
""" """
Use OMNYTools.yesno if available; otherwise default to 'yes' (or fallback to input()). Use OMNYTools.yesno if available; otherwise default to 'yes' (or fallback to input()).
@@ -107,6 +193,7 @@ class cSAXSInitSmaractStages:
# ------------------------------ # ------------------------------
# Public API # Public API
# ------------------------------ # ------------------------------
def smaract_reference_stages(self, force: bool = False, devices_to_reference=None): def smaract_reference_stages(self, force: bool = False, devices_to_reference=None):
""" """
Reference SmarAct stages using runtime discovery. Reference SmarAct stages using runtime discovery.
@@ -167,6 +254,19 @@ class cSAXSInitSmaractStages:
devices_to_reference = [devices_to_reference] devices_to_reference = [devices_to_reference]
selection = set(devices_to_reference) if devices_to_reference else None selection = set(devices_to_reference) if devices_to_reference else None
# First: ensure all relevant devices are enabled before attempting referencing
enable_report = self._ensure_all_session_devices_enabled(selection=selection, try_enable=True)
if enable_report["failed"]:
logger.warning(
"[cSAXS] Some devices could not be enabled before referencing: "
+ ", ".join(sorted(enable_report["failed"]))
)
if enable_report["inaccessible"]:
logger.warning(
"[cSAXS] Some devices were inaccessible before referencing: "
+ ", ".join(sorted(enable_report["inaccessible"]))
)
# Build axis map for selected devices (or all devices present) # Build axis map for selected devices (or all devices present)
axis_map = self._build_session_axis_map(selection=selection) axis_map = self._build_session_axis_map(selection=selection)
if selection: if selection:
@@ -174,7 +274,6 @@ class cSAXSInitSmaractStages:
if unknown: if unknown:
print(f"Unknown devices requested or missing 'bl_smar_stage' (ignored): {unknown}") print(f"Unknown devices requested or missing 'bl_smar_stage' (ignored): {unknown}")
newly_referenced = [] newly_referenced = []
already_referenced = [] already_referenced = []
failed = [] failed = []
@@ -191,6 +290,17 @@ class cSAXSInitSmaractStages:
failed.append(dev_name) failed.append(dev_name)
continue continue
# If device exposes 'enabled' and is False, skip (we already tried enabling above)
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name}: device disabled, skipping.")
failed.append(dev_name)
continue
except Exception:
print(f"{dev_name}: could not read enabled state, skipping.")
failed.append(dev_name)
continue
try: try:
is_ref = d.controller.axis_is_referenced(ch) is_ref = d.controller.axis_is_referenced(ch)
@@ -246,7 +356,17 @@ class cSAXSInitSmaractStages:
def smaract_check_all_referenced(self): def smaract_check_all_referenced(self):
""" """
Check reference state for all SmarAct devices that define 'bl_smar_stage'. Check reference state for all SmarAct devices that define 'bl_smar_stage'.
This now enables all relevant devices first (attempt), then performs the checks.
""" """
# Attempt to enable all relevant devices first (do not force enabling if you prefer)
enable_report = self._ensure_all_session_devices_enabled(selection=None, try_enable=True)
if enable_report["enabled_now"]:
print("Now enabled devices which were disabled before: " + ", ".join(sorted(enable_report["enabled_now"])))
if enable_report["failed"]:
print("Could not enable: " + ", ".join(sorted(enable_report["failed"])))
if enable_report["inaccessible"]:
print("Inaccessible: " + ", ".join(sorted(enable_report["inaccessible"])))
axis_map = self._build_session_axis_map(selection=None) axis_map = self._build_session_axis_map(selection=None)
for dev_name in sorted(axis_map.keys()): for dev_name in sorted(axis_map.keys()):
ch = axis_map[dev_name] ch = axis_map[dev_name]
@@ -254,6 +374,16 @@ class cSAXSInitSmaractStages:
if d is None: if d is None:
print(f"{dev_name}: device not accessible or unsupported.") print(f"{dev_name}: device not accessible or unsupported.")
continue continue
# Skip devices that expose 'enabled' and are False
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name} (axis {ch}) is disabled; skipping reference check.")
continue
except Exception:
print(f"{dev_name} (axis {ch}) enabled-state unknown; skipping.")
continue
try: try:
if d.controller.axis_is_referenced(ch): if d.controller.axis_is_referenced(ch):
print(f"{dev_name} (axis {ch}) is referenced.") print(f"{dev_name} (axis {ch}) is referenced.")
@@ -262,6 +392,7 @@ class cSAXSInitSmaractStages:
except Exception as e: except Exception as e:
print(f"Error checking {dev_name} (axis {ch}): {e}") print(f"Error checking {dev_name} (axis {ch}): {e}")
def smaract_components_to_initial_position(self, devices_to_move=None): def smaract_components_to_initial_position(self, devices_to_move=None):
""" """
Move selected (or all) SmarAct-based components to their configured init_position. Move selected (or all) SmarAct-based components to their configured init_position.

View File

@@ -975,49 +975,36 @@ class FlomniAlignmentMixin:
use_vertical_default_values=True, use_vertical_default_values=True,
): ):
""" """
Read the alignment parameters from xray eye fit. Read the alignment offset from the given directory and set the global parameter
tomo_alignment_fit.
Args:
dir_path (str, optional): The directory to read the alignment offset from. Defaults to os.path.expanduser("~/Data10/specES1/internal/").
""" """
tomo_alignment_fit = np.zeros((2, 5)) tomo_alignment_fit = np.zeros((2, 5))
# with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file:
# tomo_alignment_fit[0][0] = file.readline() tomo_alignment_fit[0][0] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file:
# tomo_alignment_fit[0][1] = file.readline() tomo_alignment_fit[0][1] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file:
# tomo_alignment_fit[0][2] = file.readline() tomo_alignment_fit[0][2] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file:
# tomo_alignment_fit[1][0] = file.readline() tomo_alignment_fit[1][0] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file:
# tomo_alignment_fit[1][1] = file.readline() tomo_alignment_fit[1][1] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file:
# tomo_alignment_fit[1][2] = file.readline() tomo_alignment_fit[1][2] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Ay3.txt"), "r") as file: with open(os.path.join(dir_path, "ptychotomoalign_Ay3.txt"), "r") as file:
# tomo_alignment_fit[1][3] = file.readline() tomo_alignment_fit[1][3] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file:
# tomo_alignment_fit[1][4] = file.readline()
params = dev.omny_xray_gui.fit_params_x.get()
#amplitude
tomo_alignment_fit[0][0] = params['SineModel_0_amplitude']
#phase
tomo_alignment_fit[0][1] = params['SineModel_0_shift']
#offset
tomo_alignment_fit[0][2] = params['LinearModel_1_intercept']
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 0
tomo_alignment_fit[1][4] = 0
with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file:
tomo_alignment_fit[1][4] = file.readline()
print("New alignment parameters loaded:") print("New alignment parameters loaded:")
print( print(

View File

@@ -1,5 +1,7 @@
import builtins import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose # from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
@@ -7,7 +9,6 @@ if builtins.__dict__.get("bec") is not None:
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans") scans = builtins.__dict__.get("scans")
def umv(*args): def umv(*args):
return scans.umv(*args, relative=False) return scans.umv(*args, relative=False)
@@ -28,7 +29,7 @@ class flomniGuiTools:
def flomnigui_show_gui(self): def flomnigui_show_gui(self):
if "flomni" in self.gui.windows: if "flomni" in self.gui.windows:
self.gui.flomni.raise_window() self.gui.flomni.show()
else: else:
self.gui.new("flomni") self.gui.new("flomni")
@@ -38,38 +39,39 @@ class flomniGuiTools:
def flomnigui_raise(self): def flomnigui_raise(self):
self.gui.flomni.raise_window() self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self): def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui() self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"): if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("XRayEye", object_name="xrayeye") self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# start live # start live
if not dev.cam_xeye.live_mode_enabled.get(): if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode_enabled.put(True) dev.cam_xeye.live_mode = True
self.xeyegui.switch_tab("alignment")
def flomnigui_show_xeyealign_fittab(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("XRayEye")
self.xeyegui.switch_tab("fit")
def _flomnigui_check_attribute_not_exists(self, attribute_name): def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui, "flomni"): if hasattr(self.gui,"flomni"):
if hasattr(self.gui.flomni, attribute_name): if hasattr(self.gui.flomni,attribute_name):
return False return False
return True return True
def flomnigui_show_cameras(self): def flomnigui_show_cameras(self):
self.flomnigui_show_gui() self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists( if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
"camera_gripper"
) or self._flomnigui_check_attribute_not_exists("camera_overview"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("Image") camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper): if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(device="cam_flomni_gripper", signal="preview") camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True camera_gripper_image.lock_aspect_ratio = True
camera_gripper_image.enable_fps_monitor = True camera_gripper_image.enable_fps_monitor = True
camera_gripper_image.enable_toolbar = False camera_gripper_image.enable_toolbar = False
@@ -78,9 +80,9 @@ class flomniGuiTools:
dev.cam_flomni_gripper.start_live_mode() dev.cam_flomni_gripper.start_live_mode()
else: else:
print("Cannot open camera_gripper. Device does not exist.") print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("Image") camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview): if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(device="cam_flomni_overview", signal="preview") camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True camera_overview_image.lock_aspect_ratio = True
camera_overview_image.enable_fps_monitor = True camera_overview_image.enable_fps_monitor = True
camera_overview_image.enable_toolbar = False camera_overview_image.enable_toolbar = False
@@ -91,9 +93,9 @@ class flomniGuiTools:
print("Cannot open camera_overview. Device does not exist.") print("Cannot open camera_overview. Device does not exist.")
def flomnigui_remove_all_docks(self): def flomnigui_remove_all_docks(self):
# dev.cam_flomni_overview.stop_live_mode() #dev.cam_flomni_overview.stop_live_mode()
# dev.cam_flomni_gripper.stop_live_mode() #dev.cam_flomni_gripper.stop_live_mode()
# dev.cam_xeye.live_mode = False #dev.cam_xeye.live_mode = False
self.gui.flomni.delete_all() self.gui.flomni.delete_all()
self.progressbar = None self.progressbar = None
self.text_box = None self.text_box = None
@@ -102,14 +104,15 @@ class flomniGuiTools:
self.flomnigui_show_gui() self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"): if self._flomnigui_check_attribute_not_exists("idle_text_box"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("TextBox") idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
text = ( text = (
"<pre>" "<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n" + "██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗\n"
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n" + "██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║\n"
+ "| `-,| || | | || |'.'| || |' ' || | \n" + "██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║\n"
+ "| .-'| |' '-' '| | | || | ` || | \n" + "██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║\n"
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n" + "██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║\n"
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
+ "</pre>" + "</pre>"
) )
idle_text_box.set_html_text(text) idle_text_box.set_html_text(text)
@@ -118,12 +121,13 @@ class flomniGuiTools:
import csaxs_bec import csaxs_bec
from pathlib import Path from pathlib import Path
print( print("The general flOMNI documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/flomni.html#user-ptychography-flomni")
"The general flOMNI documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/flomni.html#user-ptychography-flomni"
)
csaxs_bec_basepath = Path(csaxs_bec.__file__).parent csaxs_bec_basepath = Path(csaxs_bec.__file__).parent
docs_folder = csaxs_bec_basepath / "bec_ipython_client" / "plugins" / "flomni" / "docs" docs_folder = (
csaxs_bec_basepath /
"bec_ipython_client" / "plugins" / "flomni" / "docs"
)
if not docs_folder.is_dir(): if not docs_folder.is_dir():
raise NotADirectoryError(f"Docs folder not found: {docs_folder}") raise NotADirectoryError(f"Docs folder not found: {docs_folder}")
@@ -161,9 +165,10 @@ class flomniGuiTools:
self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget") self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget")
# --- Load PDF --------------------------------------------------------- # --- Load PDF ---------------------------------------------------------
self.pdf_viewer.load_pdf(str(pdf_file.resolve())) self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n") print(f"\nLoaded: {pdf_file.name}\n")
def _flomnicam_check_device_exists(self, device): def _flomnicam_check_device_exists(self, device):
try: try:
device device
@@ -177,18 +182,26 @@ class flomniGuiTools:
if self._flomnigui_check_attribute_not_exists("progressbar"): if self._flomnigui_check_attribute_not_exists("progressbar"):
self.flomnigui_remove_all_docks() self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget # Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("RingProgressBar") self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
# Setting multiple rings with different values self.progressbar.set_line_widths(20)
self.progressbar.add_ring().set_update("manual") # Disable automatic updates and manually set the self.progressbar value
self.progressbar.add_ring().set_update("manual") self.progressbar.enable_auto_updates(False)
self.progressbar.add_ring().set_update("scan") # Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self._flomnigui_update_progress() self._flomnigui_update_progress()
def _flomnigui_update_progress(self): def _flomnigui_update_progress(self):
main_progress_ring = self.progressbar.rings[0]
subtomo_progress_ring = self.progressbar.rings[1]
if self.progressbar is not None: if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100 progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = ( subtomo_progress = (
@@ -196,11 +209,10 @@ class flomniGuiTools:
/ self.progress["subtomo_total_projections"] / self.progress["subtomo_total_projections"]
* 100 * 100
) )
main_progress_ring.set_value(progress) self.progressbar.set_value([progress, subtomo_progress, 0])
subtomo_progress_ring.set_value(subtomo_progress) if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}" self.text_box.set_plain_text(text)
self.progressbar.set_center_label(text)
if __name__ == "__main__": if __name__ == "__main__":
@@ -211,7 +223,6 @@ if __name__ == "__main__":
client.start() client.start()
client.gui = BECGuiClient() client.gui = BECGuiClient()
flomni_gui = flomniGuiTools() flomni_gui = flomniGuiTools(client)
flomni_gui.set_client(client)
flomni_gui.flomnigui_show_gui() flomni_gui.flomnigui_show_gui()
flomni_gui.flomnigui_show_progress() flomni_gui.flomnigui_show_progress()

View File

@@ -5,11 +5,9 @@ import os
import time import time
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import numpy as np
from bec_lib import bec_logger from bec_lib import bec_logger
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger logger = bec_logger.logger
# import builtins to avoid linter errors # import builtins to avoid linter errors
@@ -17,18 +15,16 @@ bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans") scans = builtins.__dict__.get("scans")
def umv(*args): def umv(*args):
return scans.umv(*args, relative=False) return scans.umv(*args, relative=False)
if TYPE_CHECKING: if TYPE_CHECKING:
from bec_ipython_client.plugins.flomni import Flomni from bec_ipython_client.plugins.flomni import Flomni
class XrayEyeAlign: class XrayEyeAlign:
# pixel calibration, multiply to get mm # pixel calibration, multiply to get mm
test_wo_movements = True labview=False
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None: def __init__(self, client, flomni: Flomni) -> None:
@@ -40,190 +36,209 @@ class XrayEyeAlign:
self.flomni.reset_correction() self.flomni.reset_correction()
self.flomni.reset_tomo_alignment_fit() self.flomni.reset_tomo_alignment_fit()
@property
def gui(self):
return self.flomni.xeyegui
def _reset_init_values(self): def _reset_init_values(self):
self.shift_xy = [0, 0] self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0] self._xray_fov_xy = [0, 0]
def update_frame(self, keep_shutter_open=False): def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
if not dev.cam_xeye.live_mode_enabled.get(): def update_frame(self,keep_shutter_open=False):
dev.cam_xeye.live_mode_enabled.put(True) if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
self.gui.on_live_view_enabled(True) if not self.labview:
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
dev.omnyfsh.fshopen() epics_put("XOMNYI-XEYE-ACQ:0", 1)
if self.labview:
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5) time.sleep(0.5)
# stop live view # stop live view
if not keep_shutter_open: if not keep_shutter_open:
self.gui.on_live_view_enabled(False) epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(0.1) time.sleep(0.1)
dev.omnyfsh.fshclose() fshclose()
print("Received new frame.") print("got new frame")
else: else:
print("Staying in live view, shutter is and remains open!") print("Staying in live view, shutter is and remains open!")
def tomo_rotate(self, val: float): def tomo_rotate(self, val: float):
if not self.test_wo_movements: # pylint: disable=undefined-variable
umv(self.device_manager.devices.fsamroy, val) umv(self.device_manager.devices.fsamroy, val)
def get_tomo_angle(self): def get_tomo_angle(self):
return self.device_manager.devices.fsamroy.readback.get() return self.device_manager.devices.fsamroy.readback.get()
def update_fov(self, k: int): def update_fov(self, k: int):
self._xray_fov_xy[0] = max( self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
getattr(dev.omny_xray_gui, f"width_x_{k}").get(), self._xray_fov_xy[0] self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
)
self._xray_fov_xy[1] = max(
getattr(dev.omny_xray_gui, f"width_y_{k}").get(), self._xray_fov_xy[1]
)
def movement_buttons_enabled(self, enablex: bool, enabley: bool): @property
self.gui.on_motors_enable(enablex, enabley) def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def send_message(self, msg: str): def send_message(self, msg: str):
print(f"In alginment GUI: {msg}") epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
self.gui.user_message = msg
def align(self, keep_shutter_open=False): def align(self,keep_shutter_open=False):
self.flomni.flomnigui_show_xeyealign()
if not keep_shutter_open: if not keep_shutter_open:
print( print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
"This routine can be called with paramter keep_shutter_open=True to keep the shutter always open"
)
self.send_message("Getting things ready. Please wait...") self.send_message("Getting things ready. Please wait...")
self.gui.enable_submit_button(False) #potential unresolved movement requests to zero
epics_put("XOMNYI-XEYE-MVX:0", 0)
# Initialize xray align device epics_put("XOMNYI-XEYE-MVY:0", 0)
# clear potential pending movement requests
dev.omny_xray_gui.mvx.set(0)
dev.omny_xray_gui.mvy.set(0)
# reset submit channel
dev.omny_xray_gui.submit.set(0)
self.movement_buttons_enabled(False, False)
# reset shift xy and fov params # reset shift xy and fov params
self._reset_init_values() self._reset_init_values()
self.flomni.lights_off() self.flomni.lights_off()
if not self.test_wo_movements: self.flomni.flomnigui_show_xeyealign()
self.tomo_rotate(0) self.flomni.flomnigui_raise()
self.flomni.feye_in() self.tomo_rotate(0)
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
self.flomni.feye_in()
self.flomni.laser_tracker_on() self.flomni.laser_tracker_on()
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
# disable movement buttons # disable movement buttons
self.movement_buttons_enabled(False, False) self.movement_buttons_enabled = False
sample_name = self.flomni.sample_get_name(0) sample_name = self.flomni.sample_get_name(0)
self.gui.sample_name = sample_name epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", sample_name)
# this makes sure we are in a defined state # this makes sure we are in a defined state
self.flomni.feedback_disable() self.flomni.feedback_disable()
if not self.test_wo_movements: epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
self.flomni.fosa_out()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in") self.flomni.fosa_out()
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in() fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in()
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
self.gui.enable_submit_button(True) # enable submit buttons
dev.omny_xray_gui.step.set(0).wait() self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.send_message("Submit center value of FZP.") self.send_message("Submit center value of FZP.")
k = 0 k = 0
while True: while True:
if dev.omny_xray_gui.submit.get() == 1: if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") / 2 * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = ( self.alignment_values[k] = val_x
getattr(dev.omny_xray_gui, f"xval_x_{k}").get() / 2 * self.PIXEL_CALIBRATION
) # in mm
print(f"Clicked position {k}: x {self.alignment_values[k]}") print(f"Clicked position {k}: x {self.alignment_values[k]}")
rtx_position = dev.rtx.readback.get() / 1000 rtx_position = dev.rtx.readback.get() / 1000
print(f"Current rtx position {rtx_position}") print(f"Current rtx position {rtx_position}")
self.alignment_values[k] -= rtx_position self.alignment_values[k] -= rtx_position
print(f"Corrected position {k}: x {self.alignment_values[k]}") print(f"Corrected position {k}: x {self.alignment_values[k]}")
# reset submit channel
dev.omny_xray_gui.submit.set(0)
if k == 0: # received center value of FZP if k == 0: # received center value of FZP
self.send_message("please wait ...") self.send_message("please wait ...")
self.movement_buttons_enabled(False, False) self.movement_buttons_enabled = False
self.gui.enable_submit_button(False) epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.flomni.feedback_disable() self.flomni.feedback_disable()
if not self.test_wo_movements: fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in") umv(dev.fsamx, fsamx_in)
umv(dev.fsamx, fsamx_in)
self.flomni.foptics_out() self.flomni.foptics_out()
time.sleep(0.5) self.flomni.feedback_disable()
umv(dev.fsamx, fsamx_in - 0.25)
if self.labview:
self.update_frame(keep_shutter_open)
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in)
time.sleep(0.5)
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
self.send_message("Step 1/5: Adjust sample height and submit center") self.send_message("Adjust sample height and submit center")
self.gui.enable_submit_button(True) epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled(True, True) self.movement_buttons_enabled = True
elif 1 <= k < 5: # received sample center value at samroy 0 ... 315 elif 1 <= k < 5: # received sample center value at samroy 0 ... 315
self.send_message("please wait ...") self.send_message("please wait ...")
self.gui.enable_submit_button(False) epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled(False, False) self.movement_buttons_enabled = False
umv(dev.rtx, 0) umv(dev.rtx, 0)
self.tomo_rotate(k * 45) self.tomo_rotate(k * 45)
dev.omny_xray_gui.angle.set(self.get_tomo_angle()) epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
self.send_message(f"Step {k+1}/5: Submit sample center") self.send_message("Submit sample center")
self.gui.enable_submit_button(True) epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled(True, False) epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.update_fov(k) self.update_fov(k)
elif k == 5: # received sample center value at samroy 270 and done elif k == 5: # received sample center value at samroy 270 and done
self.send_message("done...") self.send_message("done...")
self.gui.enable_submit_button(False) epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled(False, False) self.movement_buttons_enabled = False
self.update_fov(k) self.update_fov(k)
break break
k += 1 k += 1
dev.omny_xray_gui.step.set(k) epics_put("XOMNYI-XEYE-STEP:0", k)
_xrayeyalignmvx = dev.omny_xray_gui.mvx.get() _xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
if _xrayeyalignmvx != 0: if _xrayeyalignmvx != 0:
umvr(dev.rtx, _xrayeyalignmvx) umvr(dev.rtx, _xrayeyalignmvx)
print(f"Current rtx position {dev.rtx.readback.get() / 1000}") print(f"Current rtx position {dev.rtx.readback.get() / 1000}")
epics_put("XOMNYI-XEYE-MVX:0", 0)
if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3) time.sleep(3)
dev.omny_xray_gui.mvx.set(0)
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
if k < 2: if k < 2:
# allow movements, store movements to calculate center # allow movements, store movements to calculate center
_xrayeyalignmvy = dev.omny_xray_gui.mvy.get() _xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
if _xrayeyalignmvy != 0: if _xrayeyalignmvy != 0:
self.flomni.feedback_disable() self.flomni.feedback_disable()
if not self.test_wo_movements: umvr(dev.fsamy, _xrayeyalignmvy / 1000)
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
time.sleep(2) time.sleep(2)
dev.omny_xray_gui.mvy.set(0) epics_put("XOMNYI-XEYE-MVY:0", 0)
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open) self.update_frame(keep_shutter_open)
time.sleep(0.1) time.sleep(0.2)
self.write_output() self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2 fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
@@ -233,17 +248,22 @@ class XrayEyeAlign:
umv(dev.rtx, 0) umv(dev.rtx, 0)
if keep_shutter_open: # free camera
if self.flomni.OMNYTools.yesno("Close the shutter now?", "y"): if self.labview:
dev.omnyfsh.fshclose() epics_put("XOMNYI-XEYE-ACQ:0", 2)
self.gui.on_live_view_enabled(False) if keep_shutter_open and not self.labview:
print("setting 'XOMNYI-XEYE-ACQ:0'") if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
print( print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy" f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"
f" = {fovy:.0f} microns" f" = {fovy:.0f} microns"
) )
print("Check the fit in the GUI...") print("Use the matlab routine to FIT the current alignment...")
print("Then LOAD ALIGNMENT PARAMETERS by running flomni.read_alignment_offset()\n") print("Then LOAD ALIGNMENT PARAMETERS by running flomni.read_alignment_offset()\n")
@@ -251,32 +271,9 @@ class XrayEyeAlign:
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues") file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")
if not os.path.exists(file): if not os.path.exists(file):
os.makedirs(os.path.dirname(file), exist_ok=True) os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, "w") as alignment_values_file: with open(file, "w") as alignment_values_file:
alignment_values_file.write("angle\thorizontal\n") alignment_values_file.write("angle\thorizontal\n")
# Initialize an empty list to store fovx values
fovx_list = []
fovx_offsets = np.zeros(5) # holds offsets for k = 1..5
for k in range(1, 6): for k in range(1, 6):
fovx_offset = self.alignment_values[0] - self.alignment_values[k] fovx_offset = self.alignment_values[0] - self.alignment_values[k]
fovx_offsets[k - 1] = fovx_offset # store in array
fovx_x = (k - 1) * 45
fovx_list.append([fovx_x, fovx_offset * 1000]) # Append the data to the list
print(f"Writing to file new alignment: number {k}, value x {fovx_offset}") print(f"Writing to file new alignment: number {k}, value x {fovx_offset}")
alignment_values_file.write(f"{fovx_x}\t{fovx_offset * 1000}\n") alignment_values_file.write(f"{(k-1)*45}\t{fovx_offset*1000}\n")
# Now build final numpy array:
data = np.array(
[
[0, 45, 90, 135, 180], # angles
fovx_offsets * 1000, # fovx_offset values
[0, 0, 0, 0, 0],
]
)
self.gui.submit_fit_array(data)
print(f"fit submited with {data}")
# self.flomni.flomnigui_show_xeyealign_fittab()

View File

@@ -1,5 +1,7 @@
import builtins import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose # from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
@@ -19,7 +21,7 @@ class OMNYGuiTools:
def __init__(self, client): def __init__(self, client):
self.gui = getattr(client, "gui", None) self.gui = getattr(client, "gui", None)
self.gui_window = self.gui.windows["main"].widget self.gui_window = self.gui.windows['main'].widget
self.fig200 = None self.fig200 = None
self.fig201 = None self.fig201 = None
self.fig202 = None self.fig202 = None
@@ -81,16 +83,12 @@ class OMNYGuiTools:
pass pass
text = ( text = (
"<pre>" "<pre>"
+ " ,o888888o. ,8. ,8. b. 8 `8.`8888. ,8' \n" + "██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗\n"
+ " . 8888 `88. ,888. ,888. 888o. 8 `8.`8888. ,8' \n" + "██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝\n"
+ ",8 8888 `8b .`8888. .`8888. Y88888o. 8 `8.`8888. ,8' \n" + "██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝ \n"
+ "88 8888 `8b ,8.`8888. ,8.`8888. .`Y888888o. 8 `8.`8888.,8' \n" + "██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝ \n"
+ "88 8888 88 ,8'8.`8888,8^8.`8888. 8o. `Y888888o. 8 `8.`88888' \n" + "██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║ \n"
+ "88 8888 88 ,8' `8.`8888' `8.`8888. 8`Y8o. `Y88888o8 `8. 8888 \n" + "╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ \n"
+ "88 8888 ,8P ,8' `8.`88' `8.`8888. 8 `Y8o. `Y8888 `8 8888 \n"
+ "`8 8888 ,8P ,8' `8.`' `8.`8888. 8 `Y8o. `Y8 8 8888 \n"
+ " ` 8888 ,88' ,8' `8 `8.`8888. 8 `Y8o.` 8 8888 \n"
+ " `8888888P' ,8' ` `8.`8888. 8 `Yo 8 8888 \n"
+ "</pre>" + "</pre>"
) )
self.idle_text_box.set_html_text(text) self.idle_text_box.set_html_text(text)
@@ -137,9 +135,7 @@ class OMNYGuiTools:
if self.progressbar is None: if self.progressbar is None:
self.omnygui_remove_all_docks() self.omnygui_remove_all_docks()
# Add a new dock with a RingProgressBar widget # Add a new dock with a RingProgressBar widget
self.progressbar = self.gui_window.add_dock(name="progress").add_widget( self.progressbar = self.gui_window.add_dock(name="progress").add_widget("RingProgressBar")
"RingProgressBar"
)
# Customize the size of the progress ring # Customize the size of the progress ring
self.progressbar.set_line_widths(20) self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value # Disable automatic updates and manually set the self.progressbar value

View File

@@ -1,23 +1,29 @@
import time import builtins
import numpy as np import datetime
import sys
import termios
import tty
import fcntl import fcntl
import os import os
import builtins import subprocess
import sys
import termios
import threading
import time
import tty
from pathlib import Path
import numpy as np
from bec_lib import bec_logger
from rich import box from rich import box
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans") scans = builtins.__dict__.get("scans")
def umv(*args): def umv(*args):
return scans.umv(*args, relative=False) return scans.umv(*args, relative=False)
@@ -110,24 +116,20 @@ class OMNYTools:
next1, next2 = sys.stdin.read(2) next1, next2 = sys.stdin.read(2)
if next1 == "[": if next1 == "[":
if next2 == "A": if next2 == "A":
# print("up")
if dev2 != "none": if dev2 != "none":
umvr(dev2, step2) umvr(dev2, step2)
if special_command != "none": if special_command != "none":
special_command() special_command()
elif next2 == "B": elif next2 == "B":
# print(" down")
if dev2 != "none": if dev2 != "none":
umvr(dev2, -step2) umvr(dev2, -step2)
if special_command != "none": if special_command != "none":
special_command() special_command()
elif next2 == "C": elif next2 == "C":
# print("right")
umvr(dev1, step1) umvr(dev1, step1)
if special_command != "none": if special_command != "none":
special_command() special_command()
elif next2 == "D": elif next2 == "D":
# print("left")
umvr(dev1, -step1) umvr(dev1, -step1)
if special_command != "none": if special_command != "none":
special_command() special_command()
@@ -143,13 +145,324 @@ class OMNYTools:
step2 = step2 / 2 step2 = step2 / 2
print(f"\rHalf step size. New step size: {step1}, {step2}\r") print(f"\rHalf step size. New step size: {step1}, {step2}\r")
except IOError: except IOError:
# No input available, keep looping
pass pass
# Sleep for a short period to avoid high CPU usage
time.sleep(0.02) time.sleep(0.02)
finally: finally:
# Restore the terminal to its original state # Restore the terminal to its original state
termios.tcsetattr(fd, termios.TCSADRAIN, old_term) termios.tcsetattr(fd, termios.TCSADRAIN, old_term)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags) fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
import socket
class BeamlineChecker:
"""Monitors beamline health during scans.
Runs checks in a background thread and blocks scan progress
until beam conditions are restored if they fail.
Usage:
checker = BeamlineChecker(client)
checker._bl_chk_start()
# ... run scan ...
beam_was_ok = checker._bl_chk_stop()
if not beam_was_ok:
checker._bl_chk_wait_until_recovered()
"""
def __init__(self, client):
self.client = client
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
self._beam_is_okay = True
self._stop_event = None
self._thread = None
self._local_network_warned = False
self._check_msgs = []
# ------------------------------------------------------------------
# Public control interface
# ------------------------------------------------------------------
def bl_chk_status(self):
"""Print and return the current enabled/disabled state of all checks."""
if self._is_local_network():
print("Beamline checks cannot be performed on this network (129.129.98.x) — skipping.")
return {}
status = {
"shutter": self.check_shutter,
"fofb": self.check_fofb,
"light available": self.check_light_available,
}
print(
f"Shutter: {self.check_shutter}\n"
f"FOFB: {self.check_fofb}\n"
f"Light available: {self.check_light_available}"
)
return status
def bl_chk_enable_all(self):
"""Enable all beamline checks."""
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
self.bl_chk_status()
def bl_chk_disable_all(self):
"""Disable all beamline checks."""
self.check_shutter = False
self.check_light_available = False
self.check_fofb = False
self.bl_chk_status()
def bl_chk_enable_shutter(self):
"""Enable the shutter check."""
self.check_shutter = True
self.bl_chk_status()
def bl_chk_disable_shutter(self):
"""Disable the shutter check."""
self.check_shutter = False
self.bl_chk_status()
def bl_chk_enable_fofb(self):
"""Enable the fast orbit feedback check."""
self.check_fofb = True
self.bl_chk_status()
def bl_chk_disable_fofb(self):
"""Disable the fast orbit feedback check."""
self.check_fofb = False
self.bl_chk_status()
def bl_chk_enable_light(self):
"""Enable the light available check."""
self.check_light_available = True
self.bl_chk_status()
def bl_chk_disable_light(self):
"""Disable the light available check."""
self.check_light_available = False
self.bl_chk_status()
def _bl_chk_start(self):
"""Start the background beam check thread."""
self._beam_is_okay = True
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._poll, daemon=True)
self._thread.start()
def _bl_chk_stop(self) -> bool:
"""Stop the background thread and return whether beam was okay throughout."""
self._stop_event.set()
self._thread.join()
return self._beam_is_okay
def _bl_chk_wait_until_recovered(self):
"""Block until all beamline checks pass again, logging to SciLog."""
self._log_failure_to_scilog()
while True:
self._beam_is_okay = True
self._check_msgs = self._run_checks()
if self._beam_is_okay:
break
self._print_msgs()
time.sleep(1)
self._log_recovery_to_scilog()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _is_local_network(self) -> bool:
"""Return True if running on the 129.129.98.x subnet."""
try:
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
return ip.startswith("129.129.98.")
except Exception:
return False
def _run_checks(self) -> list:
if self._is_local_network():
if not self._local_network_warned:
print("Beamline checks cannot be performed on this network (129.129.98.x) — skipping.")
self._local_network_warned = True
return []
msgs = []
dev = builtins.__dict__.get("dev")
try:
if self.check_shutter:
val = dev.x12sa_es1_shutter_status.read(cached=True)
if val["value"].lower() != "open":
self._beam_is_okay = False
msgs.append("Check beam failed: Shutter is closed.")
if self.check_light_available:
val = dev.sls_machine_status.read(cached=True)
if val["value"] not in ["Light Available", "Light-Available"]:
self._beam_is_okay = False
msgs.append("Check beam failed: Light not available.")
if self.check_fofb:
val = dev.sls_fast_orbit_feedback.read(cached=True)
if val["value"] != "running":
self._beam_is_okay = False
msgs.append("Check beam failed: Fast orbit feedback is not running.")
except Exception:
logger.warning("Failed to check beam.")
return msgs
def _poll(self):
while not self._stop_event.is_set():
self._check_msgs = self._run_checks()
if not self._beam_is_okay:
self._stop_event.set()
time.sleep(1)
def _print_msgs(self):
for msg in self._check_msgs:
logger.warning(msg)
def _log_failure_to_scilog(self):
self._print_msgs()
try:
bec = builtins.__dict__.get("bec")
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Beamline checks failed at"
f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)}</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send beam failure update to SciLog.")
def _log_recovery_to_scilog(self):
try:
bec = builtins.__dict__.get("bec")
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Operation resumed at"
f" {str(datetime.datetime.now())}.</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send beam recovery update to SciLog.")
class PtychoReconstructor:
"""Writes ptychography reconstruction queue files after each scan projection.
An external reconstruction engine monitors the queue folder and picks
up .dat files as they are written.
Usage:
reconstructor = PtychoReconstructor(folder_name="reconstruction_queue")
reconstructor.write(
scan_list=[1023, 1024],
next_scan_number=1025,
base_path="~/data/raw",
)
"""
def __init__(self, folder_name: str = "reconstruction_queue"):
self.folder_name = folder_name
def _accounts_match(self) -> bool:
"""Check if bec.active_account matches the current system user (p vs e prefix)."""
try:
bec = builtins.__dict__.get("bec")
active = bec.active_account # e.g. "p23092"
system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092"
print(f"Active server account {active}, BEC client account {system_user}.")
return active[1:] == system_user[1:]
except Exception:
logger.warning("Failed to compare active account to system user.")
return False
def write(self, scan_list: list, next_scan_number: int, base_path: str = "~/data/raw/analysis/"):
"""Write a reconstruction queue file for the given scan list.
Args:
scan_list (list): Scan numbers belonging to this projection
(may contain multiple entries when stitching).
next_scan_number (int): The current next scan number, used to
name the queue file.
base_path (str): Root path under which the queue folder lives.
"""
base_path = os.path.expanduser(base_path)
queue_path = Path(os.path.join(base_path, self.folder_name))
queue_path.mkdir(parents=True, exist_ok=True)
last_scan_number = next_scan_number - 1
queue_file = os.path.abspath(
os.path.join(queue_path, f"scan_{last_scan_number:05d}.dat")
)
with open(queue_file, "w") as f:
scans = " ".join(str(s) for s in scan_list)
f.write(f"p.scan_number {scans}\n")
f.write("p.check_nextscan_started 1\n")
class TomoIDManager:
"""Registers a tomography measurement in the OMNY sample database
and returns its assigned tomo ID.
Usage:
id_manager = TomoIDManager()
tomo_id = id_manager.register(
sample_name="my_sample",
date="2024-03-08",
eaccount="e12345",
scan_number=1001,
setup="lamni",
additional_info="test info",
user="BEC",
)
"""
OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
OMNY_USER = "omny"
OMNY_PASSWORD = "samples"
TMP_FILE = "/tmp/currsamplesnr.txt"
def register(
self,
sample_name: str,
date: str,
eaccount: str,
scan_number: int,
setup: str,
additional_info: str,
user: str,
) -> int:
"""Register a new measurement and return the assigned tomo ID.
Args:
sample_name (str): Name of the sample.
date (str): Date string (e.g. "2024-03-08").
eaccount (str): E-account identifier.
scan_number (int): First scan number of the measurement.
setup (str): Setup name (e.g. "lamni").
additional_info (str): Any additional sample information.
user (str): User name.
Returns:
int: The tomo ID assigned by the OMNY database.
"""
url = (
f"{self.OMNY_URL}"
f"?sample={sample_name}"
f"&date={date}"
f"&eaccount={eaccount}"
f"&scannr={scan_number}"
f"&setup={setup}"
f"&additional={additional_info}"
f"&user={user}"
)
subprocess.run(
f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
f" -q -O {self.TMP_FILE} '{url}'",
shell=True,
)
with open(self.TMP_FILE) as f:
return int(f.read())

View File

@@ -111,3 +111,34 @@ bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS # SETUP PROMPTS
bec._ip.prompts.session_name = _session_name bec._ip.prompts.session_name = _session_name
bec._ip.prompts.status = 1 bec._ip.prompts.status = 1
# ACCOUNT MISMATCH CHECK
import os
def _check_account_mismatch():
try:
active = bec.active_account # e.g. "p23092"
system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092"
if active[1:] != system_user[1:]:
print(f"""
\033[91m\033[1m
██╗ ██╗ █████╗ ██████╗ ███╗ ██╗██╗███╗ ██╗ ██████╗
██║ ██║██╔══██╗██╔══██╗████╗ ██║██║████╗ ██║██╔════╝
██║ █╗ ██║███████║██████╔╝██╔██╗ ██║██║██╔██╗ ██║██║ ███╗
██║███╗██║██╔══██║██╔══██╗██║╚██╗██║██║██║╚██╗██║██║ ██║
╚███╔███╔╝██║ ██║██║ ██║██║ ╚████║██║██║ ╚████║╚██████╔╝
╚══╝╚══╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═══╝ ╚═════╝
ACCOUNT MISMATCH DETECTED!
BEC active account : {active}
System user : {system_user}
Data read and written by the BEC client does not match the data account!
Please verify you are logged in with the correct account.
\033[0m""")
except Exception:
logger.warning("Failed to verify account match.")
if _args.session.lower() == "lamni" or _args.session.lower() == "flomni" or _args.session.lower() == "omny":
_check_account_mismatch()

View File

@@ -13,15 +13,24 @@ logger = bec_logger.logger
_Widgets = { _Widgets = {
"OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye", "XRayEye": "XRayEye",
} }
class XRayEye(RPCBase): class OmnyAlignment(RPCBase):
@property
@rpc_call @rpc_call
def active_roi(self) -> "BaseROI | None": def enable_live_view(self):
""" """
Return the currently active ROI, or None if no ROI is active. None
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
None
""" """
@property @property
@@ -38,30 +47,6 @@ class XRayEye(RPCBase):
None None
""" """
@rpc_call
def on_live_view_enabled(self, enabled: "bool"):
"""
None
"""
@rpc_call
def on_motors_enable(self, x_enable: "bool", y_enable: "bool"):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
@rpc_call
def enable_submit_button(self, enable: "bool"):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
@property @property
@rpc_call @rpc_call
def sample_name(self): def sample_name(self):
@@ -90,14 +75,66 @@ class XRayEye(RPCBase):
None None
""" """
class XRayEye(RPCBase):
@rpc_call @rpc_call
def switch_tab(self, tab: "str"): def active_roi(self) -> "BaseROI | None":
"""
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):
""" """
None None
""" """
@user_message.setter
@rpc_call @rpc_call
def submit_fit_array(self, fit_array): def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
""" """
None None
""" """
@@ -109,15 +146,3 @@ class XRayEye2DControl(RPCBase):
""" """
Cleanup the BECConnector Cleanup the BECConnector
""" """
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""

View File

@@ -0,0 +1,140 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
return
image.disconnect_monitor("cam_xeye")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@
{'files': ['omny_alignment.py']}

View File

@@ -0,0 +1,125 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -0,0 +1,54 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -5,7 +5,6 @@ from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.widgets.plots.image.image import Image from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
@@ -22,10 +21,7 @@ from qtpy.QtWidgets import (
QToolButton, QToolButton,
QVBoxLayout, QVBoxLayout,
QWidget, QWidget,
QTextEdit,
QTabWidget,
) )
import time
logger = bec_logger.logger logger = bec_logger.logger
CAMERA = ("cam_xeye", "image") CAMERA = ("cam_xeye", "image")
@@ -45,38 +41,38 @@ class XRayEye2DControl(BECWidget, QWidget):
""") """)
# Up # Up
self.move_up_button = QToolButton(parent=self) self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon("keyboard_double_arrow_up")) self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.root_layout.addWidget(self.move_up_button, 0, 2) self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button # Up tweak button
self.move_up_tweak_button = QToolButton(parent=self) self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon("keyboard_arrow_up")) self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2) self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left # Left
self.move_left_button = QToolButton(parent=self) self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon("keyboard_double_arrow_left")) self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.root_layout.addWidget(self.move_left_button, 2, 0) self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button # Left tweak button
self.move_left_tweak_button = QToolButton(parent=self) self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon("keyboard_arrow_left")) self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1) self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right # Right
self.move_right_button = QToolButton(parent=self) self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon("keyboard_double_arrow_right")) self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.root_layout.addWidget(self.move_right_button, 2, 4) self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button # Right tweak button
self.move_right_tweak_button = QToolButton(parent=self) self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon("keyboard_arrow_right")) self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3) self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down # Down
self.move_down_button = QToolButton(parent=self) self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon("keyboard_double_arrow_down")) self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.root_layout.addWidget(self.move_down_button, 4, 2) self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button # Down tweak button
self.move_down_tweak_button = QToolButton(parent=self) self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon("keyboard_arrow_down")) self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2) self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections # Connections
@@ -128,20 +124,8 @@ class XRayEye2DControl(BECWidget, QWidget):
class XRayEye(BECWidget, QWidget): class XRayEye(BECWidget, QWidget):
USER_ACCESS = [ USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"active_roi", "sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
"user_message",
"user_message.setter",
"on_live_view_enabled",
"on_motors_enable",
"enable_submit_button",
"sample_name",
"sample_name.setter",
"enable_move_buttons",
"enable_move_buttons.setter",
"switch_tab",
"submit_fit_array",
]
PLUGIN = True PLUGIN = True
def __init__(self, parent=None, **kwargs): def __init__(self, parent=None, **kwargs):
@@ -152,44 +136,28 @@ class XRayEye(BECWidget, QWidget):
self._make_connections() self._make_connections()
# Connection to redis endpoints # Connection to redis endpoints
self.bec_dispatcher.connect_slot( self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh")
)
self.bec_dispatcher.connect_slot(
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
)
self.connect_motors() self.connect_motors()
self.resize(800, 600) self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger) QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self): def _init_ui(self):
self.root_layout = QVBoxLayout(self) self.core_layout = QHBoxLayout(self)
self.tab_widget = QTabWidget(parent=self)
self.root_layout.addWidget(self.tab_widget)
self.alignment_tab = QWidget(parent=self) self.image = Image(parent=self)
self.core_layout = QHBoxLayout(self.alignment_tab) self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image = Image(parent=self.alignment_tab)
self.image.color_map = "CET-L2"
self.image.enable_toolbar = (
False # Disable default toolbar to not allow to user set anything
)
self.image.inner_axes = False # Disable inner axes to maximize image area self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.enable_full_colorbar = True self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
self.image.invert_y = True # Invert y axis to match image coordinates
# Control panel on the right: vertical layout inside a fixed-width widget # Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self.alignment_tab) self.control_panel = QWidget(parent=self)
self.control_panel_layout = QVBoxLayout(self.control_panel) self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0) self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10) self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row) # ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree( self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
parent=self, image_widget=self.image, compact=True, compact_orientation="horizontal" compact_orientation="horizontal")
)
header_row = QHBoxLayout() header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0) header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8) header_row.setSpacing(8)
@@ -198,36 +166,16 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_label = QLabel("Live Preview", parent=self) self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self) self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignmentFlag.AlignVCenter) header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignmentFlag.AlignVCenter) header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
self.control_panel_layout.addLayout(header_row) self.control_panel_layout.addLayout(header_row)
switch_row = QHBoxLayout()
switch_row.setContentsMargins(0, 0, 0, 0)
switch_row.setSpacing(8)
switch_row.addStretch()
self.camera_running_label = QLabel("Camera running", parent=self)
self.camera_running_toggle = ToggleSwitch(parent=self)
# self.camera_running_toggle.checked = False
self.camera_running_toggle.enabled.connect(self.camera_running_enabled)
self.shutter_label = QLabel("Shutter open", parent=self)
self.shutter_toggle = ToggleSwitch(parent=self)
# self.shutter_toggle.checked = False
self.shutter_toggle.enabled.connect(self.opening_shutter)
switch_row.addWidget(self.shutter_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.shutter_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(switch_row)
# separator # separator
self.control_panel_layout.addWidget(self._create_separator()) self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size) # 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self) self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget( self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
self.motor_control_2d, 0, Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignCenter
)
# separator # separator
self.control_panel_layout.addWidget(self._create_separator()) self.control_panel_layout.addWidget(self._create_separator())
@@ -242,8 +190,9 @@ class XRayEye(BECWidget, QWidget):
# Submit button # Submit button
self.submit_button = QPushButton("Submit", parent=self) self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form # Add to layout form
step_size_form.addWidget(QLabel("Step Size", parent=self), 0, 0) step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1) step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2) step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel # Add form to control panel
@@ -258,8 +207,7 @@ class XRayEye(BECWidget, QWidget):
self.sample_name_line_edit.setReadOnly(True) self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0) form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1) form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QTextEdit(parent=self) self.message_line_edit = QLineEdit(parent=self)
self.message_line_edit.setFixedHeight(60)
self.message_line_edit.setReadOnly(True) self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0) form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1) form.addWidget(self.message_line_edit, 1, 1)
@@ -269,57 +217,12 @@ class XRayEye(BECWidget, QWidget):
self.control_panel.adjustSize() self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint() p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width()) self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding) self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
# Core Layout: image (expanding) | control panel (fixed) # Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image) self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel) self.core_layout.addWidget(self.control_panel)
self.tab_widget.addTab(self.alignment_tab, "Alignment")
self.fit_tab = QWidget(parent=self)
self.fit_layout = QVBoxLayout(self.fit_tab)
self.waveform_x = Waveform(parent=self.fit_tab)
self.waveform_y = Waveform(parent=self.fit_tab)
self.waveform_x.plot(
x=[0],
y=[1],
label="fit-x",
dap=["SineModel", "LinearModel"],
dap_parameters=[
{"frequency": {"value": 0.0174533, "vary": False, "min": 0.01, "max": 0.02}},
{"slope": {"value": 0, "vary": False, "min": 0.0, "max": 0.02}},
],
dap_oversample=5,
)
self.waveform_y.plot(
x=[0],
y=[2],
label="fit-y",
dap=["SineModel", "LinearModel"],
dap_parameters=[
{"frequency": {"value": 0.0174533, "vary": False, "min": 0.01, "max": 0.02}},
{"slope": {"value": 0, "vary": False, "min": 0.0, "max": 0.02}},
],
dap_oversample=5,
)
self.fit_x = self.waveform_x.curves[0]
self.fit_y = self.waveform_y.curves[0]
self.waveform_x.dap_params_update.connect(self.on_dap_params)
self.waveform_y.dap_params_update.connect(self.on_dap_params)
for wave in (self.waveform_x, self.waveform_y):
wave.x_label = "Angle (deg)"
wave.x_grid = True
wave.y_grid = True
wave.enable_toolbar = True
self.fit_layout.addWidget(self.waveform_x)
self.fit_layout.addWidget(self.waveform_y)
self.tab_widget.addTab(self.fit_tab, "Fit")
def _make_connections(self): def _make_connections(self):
# Fetch initial state # Fetch initial state
self.on_live_view_enabled(True) self.on_live_view_enabled(True)
@@ -327,36 +230,31 @@ class XRayEye(BECWidget, QWidget):
# Make connections # Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled) self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect( self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
lambda x: self.motor_control_2d.setProperty("step_size", x)
)
self.submit_button.clicked.connect(self.submit) self.submit_button.clicked.connect(self.submit)
def _create_separator(self): def _create_separator(self):
sep = QFrame(parent=self) sep = QFrame(parent=self)
sep.setFrameShape(QFrame.Shape.HLine) sep.setFrameShape(QFrame.HLine)
sep.setFrameShadow(QFrame.Shadow.Sunken) sep.setFrameShadow(QFrame.Sunken)
sep.setLineWidth(1) sep.setLineWidth(1)
return sep return sep
def _init_gui_trigger(self): def _init_gui_trigger(self):
self.dev.omny_xray_gui.read() self.dev.omny_xray_gui.read()
self.dev.omnyfsh.read()
################################################################################ ################################################################################
# Device Connection logic # Device Connection logic
################################################################################ ################################################################################
def connect_motors(self): def connect_motors(self):
"""Checks one of the possible motors for flomni, omny and lamni setup.""" """ Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ["osamroy", "lsamrot", "fsamroy"] possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
for motor in possible_motors: for motor in possible_motors:
if motor in self.dev: if motor in self.dev:
self.bec_dispatcher.connect_slot( self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor) logger.info(f"Succesfully connected to {motor}")
)
logger.info(f"Successfully connected to {motor}")
################################################################################ ################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed # Properties ported from the original OmnyAlignment, can be adjusted as needed
@@ -393,13 +291,6 @@ class XRayEye(BECWidget, QWidget):
# Slots ported from the original OmnyAlignment, can be adjusted as needed # Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################ ################################################################################
@SafeSlot(str)
def switch_tab(self, tab: str):
if tab == "fit":
self.tab_widget.setCurrentIndex(1)
else:
self.tab_widget.setCurrentIndex(0)
@SafeSlot() @SafeSlot()
def get_roi_coordinates(self) -> dict | None: def get_roi_coordinates(self) -> dict | None:
"""Get the coordinates of the currently active ROI.""" """Get the coordinates of the currently active ROI."""
@@ -416,48 +307,14 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_toggle.blockSignals(True) self.live_preview_toggle.blockSignals(True)
if enabled: if enabled:
self.live_preview_toggle.checked = enabled self.live_preview_toggle.checked = enabled
self.image.image(device=CAMERA[0], signal=CAMERA[1]) self.image.image(CAMERA)
self.live_preview_toggle.blockSignals(False) self.live_preview_toggle.blockSignals(False)
return return
self.image.disconnect_monitor(CAMERA[0], CAMERA[1]) self.image.disconnect_monitor(CAMERA)
self.live_preview_toggle.checked = enabled self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False) self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool)
def camera_running_enabled(self, enabled: bool):
logger.info(f"Camera running: {enabled}")
self.camera_running_toggle.blockSignals(True)
self.dev.get(CAMERA[0]).live_mode_enabled.put(enabled)
self.camera_running_toggle.checked = enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_camera_status(self, data, meta):
print(f"msg:{data}")
live_mode_enabled = data.get("signals").get(f"{CAMERA[0]}_live_mode_enabled").get("value")
self.camera_running_toggle.blockSignals(True)
self.camera_running_toggle.checked = live_mode_enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(bool)
def opening_shutter(self, enabled: bool):
logger.info(f"Shutter changed from GUI to: {enabled}")
self.shutter_toggle.blockSignals(True)
if enabled:
self.dev.omnyfsh.fshopen()
else:
self.dev.omnyfsh.fshclose()
# self.shutter_toggle.checked = enabled
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_shutter_status(self, data, meta):
shutter_open = bool(data.get("signals").get("omnyfsh_shutter").get("value"))
self.shutter_toggle.blockSignals(True)
self.shutter_toggle.checked = shutter_open
self.shutter_toggle.blockSignals(False)
@SafeSlot(bool, bool) @SafeSlot(bool, bool)
def on_motors_enable(self, x_enable: bool, y_enable: bool): def on_motors_enable(self, x_enable: bool, y_enable: bool):
""" """
@@ -470,108 +327,98 @@ class XRayEye(BECWidget, QWidget):
self.motor_control_2d.enable_controls_hor(x_enable) self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable) self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(bool) @SafeSlot(int)
def enable_submit_button(self, enable: bool): def enable_submit_button(self, enable: int):
""" """
Enable/disable submit button. Enable/disable submit button.
Args: Args:
enable(int): -1 disable else enable enable(int): -1 disable else enable
""" """
if enable: if enable == -1:
self.submit_button.setEnabled(True)
else:
self.submit_button.setEnabled(False) self.submit_button.setEnabled(False)
@SafeSlot(dict, dict)
def on_dap_params(self, data, meta):
print("#######################################")
print("getting dap parameters")
print(f"data: {data}")
print(f"meta: {meta}")
self.waveform_x.auto_range(True)
self.waveform_y.auto_range(True)
# self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
curve_id = meta.get("curve_id")
if curve_id == "fit-x-SineModel+LinearModel":
self.dev.omny_xray_gui.fit_params_x.set(data).wait()
print(f"setting x data to {data}")
else: else:
self.dev.omny_xray_gui.fit_params_y.set(data).wait() self.submit_button.setEnabled(True)
print(f"setting y data to {data}")
# self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
@SafeSlot(bool, bool) @SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict): def on_tomo_angle_readback(self, data: dict, meta: dict):
# TODO implement if needed #TODO implement if needed
print(f"data: {data}") print(f"data: {data}")
print(f"meta: {meta}") print(f"meta: {meta}")
@SafeSlot() @SafeSlot(dict, dict)
def submit_fit_array(self, fit_array): def device_updates(self, data: dict, meta: dict):
self.tab_widget.setCurrentIndex(1) """
# self.fix_x.title = " got fit array" Slot to handle device updates from omny_xray_gui device.
print(f"got fit array {fit_array}")
self.waveform_x.curves[0].set_data(x=fit_array[0], y=fit_array[1]) Args:
self.waveform_y.curves[0].set_data(x=fit_array[0], y=fit_array[2]) data(dict): data from device
# self.fit_x.set_data(x=fit_array[0],y=fit_array[1]) meta(dict): metadata from device
# self.fit_y.set_data(x=fit_array[0],y=fit_array[2]) """
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot() @SafeSlot()
def submit(self): def submit(self):
"""Execute submit action by submit button.""" """Execute submit action by submit button."""
print("submit pushed")
self.submit_button.blockSignals(True)
if self.roi_manager.single_active_roi is None: if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI") logger.warning("No active ROI")
return return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates() roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates["center_x"] roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates["center_y"] roi_center_y = roi_coordinates['center_y']
# Case of rectangular ROI # Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI): if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates["width"] roi_width = roi_coordinates['width']
roi_height = roi_coordinates["height"] roi_height = roi_coordinates['height']
elif isinstance(self.roi_manager.single_active_roi, CircularROI): elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates["diameter"] roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates["radius"] roi_height = roi_coordinates['radius']
else: else:
logger.warning("Unsupported ROI type for submit action.") logger.warning("Unsupported ROI type for submit action.")
return return
print( print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}"
) # TODO remove when will be not needed for debugging
# submit roi coordinates # submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get("value")) step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
xval_x = getattr(self.dev.omny_xray_gui, f"xval_x_{step}").set(roi_center_x) xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui, f"yval_y_{step}").set(roi_center_y) xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui, f"width_x_{step}").set(roi_width) width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui, f"width_y_{step}").set(roi_height) width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1) self.dev.omny_xray_gui.submit.set(1)
print("submit done")
self.submit_button.blockSignals(False)
def cleanup(self): def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera.""" """Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot( self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui") getattr(self.dev,CAMERA[0]).live_mode = False
)
getattr(self.dev, CAMERA[0]).stop_live_mode()
super().cleanup() super().cleanup()
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys
from qtpy.QtWidgets import QApplication from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv) app = QApplication(sys.argv)
apply_theme("light")
dispatcher = BECDispatcher(gui_id="xray")
win = XRayEye() win = XRayEye()
win.resize(1000, 800) win.resize(1000, 800)

View File

@@ -435,9 +435,9 @@ cam_xeye:
# deviceConfig: # deviceConfig:
# camera_id: 203 # camera_id: 203
# bits_per_pixel: 24 # bits_per_pixel: 24
# num_rotation_90: 2 # num_rotation_90: 3
# transpose: false # transpose: false
# force_monochrome: false # force_monochrome: true
# m_n_colormode: 1 # m_n_colormode: 1
# enabled: true # enabled: true
# onFailure: buffer # onFailure: buffer
@@ -472,7 +472,7 @@ omnyfsh:
############################################################ ############################################################
omny_xray_gui: omny_xray_gui:
description: Gui signals description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayAlignGUI deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceConfig: {} deviceConfig: {}
enabled: true enabled: true
onFailure: buffer onFailure: buffer

View File

@@ -1,161 +0,0 @@
"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK."""
import threading
import traceback
from enum import IntEnum
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt, Kind, Signal
from ophyd.areadetector import ADComponent as ADCpt
from ophyd.areadetector import DetectorBase
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
logger = bec_logger.logger
class ACQUIRE_MODES(IntEnum):
"""Acquiring enums for Allied Vision Camera"""
ACQUIRING = 1
DONE = 0
class AlliedVisionCamera(PSIDeviceBase, DetectorBase):
"""
Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK.
The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'.
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
cam = ADCpt(VimbaDetectorCam, "cam1:")
image = ADCpt(ImagePlugin, "image1:")
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0,
transpose=False,
doc="Preview signal of the AlliedVision camera.",
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(
self,
*,
name: str,
prefix: str,
poll_rate: int = 5,
num_rotation_90: int = 0,
transpose: bool = False,
scan_info=None,
device_manager=None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_start_event = threading.Event()
if poll_rate <= 0:
logger.warning(
f"Poll rate must be positive for Camera {self.name} and non-zero, setting to 1 Hz."
)
poll_rate = 1
self.stop_live_mode()
elif poll_rate > 10:
logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.")
poll_rate = 10
self._poll_rate = poll_rate
self._unique_array_id = 0
self._pv_timeout = 2.0
self.image: ImagePlugin
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
self._live_mode_lock = threading.RLock()
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
self.cam.acquire.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
"""Start live mode."""
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
"""Stop live mode."""
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if not self._poll_start_event.is_set():
self._poll_start_event.set()
self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition
else:
logger.info(f"Live mode already started for {self.name}.")
return
if self._poll_start_event.is_set():
self._poll_start_event.clear()
self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition
else:
logger.info(f"Live mode already stopped for {self.name}.")
def on_connected(self):
"""Reset the unique array ID on connection."""
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
self._poll_thread.start()
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while self._poll_start_event.wait():
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
array_size = self.image.array_size.get()
if array_size[0] == 0: # 2D image, not color image
array_size = array_size[1:]
# Geometry correction for the image
data = np.reshape(value, array_size)
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
def on_destroy(self):
"""Stop the polling thread on destruction."""
self._poll_start_event.set()
self._poll_thread_kill_event.set()
if self._poll_thread.is_alive():
self._poll_thread.join(timeout=2)

View File

@@ -443,6 +443,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
while not self._scan_done_thread_kill_event.is_set(): while not self._scan_done_thread_kill_event.is_set():
while self._start_monitor_async_data_emission.wait(): while self._start_monitor_async_data_emission.wait():
try: try:
logger.debug(f"Monitoring async data emission for {self.name}...")
if ( if (
hasattr(self.scan_info.msg, "num_points") hasattr(self.scan_info.msg, "num_points")
and self.scan_info.msg.num_points is not None and self.scan_info.msg.num_points is not None
@@ -452,6 +453,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
for callback in self._scan_done_callbacks: for callback in self._scan_done_callbacks:
callback(exception=None) callback(exception=None)
else: else:
logger.info(f"Current data index is {self._current_data_index}")
if self._current_data_index >= 1: if self._current_data_index >= 1:
for callback in self._scan_done_callbacks: for callback in self._scan_done_callbacks:
callback(exception=None) callback(exception=None)

View File

@@ -156,6 +156,7 @@ class Camera:
camera_id (int): The ID of the camera device. camera_id (int): The ID of the camera device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera. m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera. bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
""" """
def __init__( def __init__(

View File

@@ -3,19 +3,21 @@
from __future__ import annotations from __future__ import annotations
import threading import threading
from typing import TYPE_CHECKING, Literal import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np import numpy as np
from ophyd import Component as Cpt, Signal, Kind
from bec_lib import messages from bec_lib import messages
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera from ophyd import Component as Cpt
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING: if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger logger = bec_logger.logger
@@ -43,15 +45,8 @@ class IDSCamera(PSIDeviceBase):
doc="Signal for the region of interest (ROI).", doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]}, async_update={"type": "add", "max_shape": [None]},
) )
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
USER_ACCESS = ["start_live_mode", "stop_live_mode", "mask", "set_rect_roi", "get_last_image"] USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__( def __init__(
self, self,
@@ -88,22 +83,15 @@ class IDSCamera(PSIDeviceBase):
bits_per_pixel=bits_per_pixel, bits_per_pixel=bits_per_pixel,
connect=False, connect=False,
) )
self._live_mode = False
self._inputs = {"live_mode": live_mode} self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8) self._mask = np.zeros((1, 1), dtype=np.uint8)
self.image.num_rotation_90 = num_rotation_90 self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose self.image.transpose = transpose
self._force_monochrome = force_monochrome self._force_monochrome = force_monochrome
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
self.live_mode_enabled.put(bool(live_mode))
############## Live Mode Methods ############## ############## Live Mode Methods ##############
def start_live_mode(self) -> None:
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
@property @property
def mask(self) -> np.ndarray: def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera.""" """Return the current region of interest (ROI) for the camera."""
@@ -126,15 +114,22 @@ class IDSCamera(PSIDeviceBase):
) )
self._mask = value self._mask = value
def _on_live_mode_enabled_changed(self, *args, value, **kwargs): @property
"""Callback for when live mode is changed.""" def live_mode(self) -> bool:
enabled = bool(value) """Return whether the camera is in live mode."""
if enabled and self.cam._connected is False: # pylint: disable=protected-access return self._live_mode
self.cam.on_connect()
if enabled: @live_mode.setter
self._start_live() def live_mode(self, value: bool):
else: """Set the live mode for the camera."""
self._stop_live() if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int): def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera.""" """Set the rectangular region of interest (ROI) for the camera."""
@@ -201,7 +196,7 @@ class IDSCamera(PSIDeviceBase):
"""Connect to the camera.""" """Connect to the camera."""
self.cam.force_monochrome = self._force_monochrome self.cam.force_monochrome = self._force_monochrome
self.cam.on_connect() self.cam.on_connect()
self.live_mode_enabled.put(bool(self._inputs.get("live_mode", False))) self.live_mode = self._inputs.get("live_mode", False)
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value) self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self): def on_destroy(self):
@@ -211,7 +206,7 @@ class IDSCamera(PSIDeviceBase):
def on_trigger(self): def on_trigger(self):
"""Handle the trigger event.""" """Handle the trigger event."""
if not bool(self.live_mode_enabled.get()): if not self.live_mode:
return return
image = self.image.get() image = self.image.get()
if image is not None: if image is not None:

View File

@@ -2,7 +2,7 @@ import requests
import threading import threading
import cv2 import cv2
import numpy as np import numpy as np
from ophyd import Device, Component as Cpt, Kind, Signal from ophyd import Device, Component as Cpt
from ophyd_devices import PreviewSignal from ophyd_devices import PreviewSignal
import traceback import traceback
@@ -13,13 +13,6 @@ logger = bec_logger.logger
class WebcamViewer(Device): class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"] USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False) preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None: def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs) super().__init__(name=name, **kwargs)
@@ -28,54 +21,20 @@ class WebcamViewer(Device):
self._update_thread = None self._update_thread = None
self._buffer = b"" self._buffer = b""
self._shutdown_event = threading.Event() self._shutdown_event = threading.Event()
self._live_mode_lock = threading.RLock()
self.preview.num_rotation_90 = num_rotation_90 self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose self.preview.transpose = transpose
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None: def start_live_mode(self) -> None:
self.live_mode_enabled.put(True) if self._connection is not None:
return
def stop_live_mode(self) -> None: self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self.live_mode_enabled.put(False) self._update_thread.start()
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if self._update_thread is not None and self._update_thread.is_alive():
return
self._shutdown_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
return
if self._update_thread is None:
return
self._shutdown_event.set()
if self._connection is not None:
try:
self._connection.close()
except Exception: # pylint: disable=broad-except
pass
self._connection = None
self._update_thread.join(timeout=2)
if self._update_thread.is_alive():
logger.warning("Webcam live mode thread did not stop within timeout.")
return
self._update_thread = None
self._buffer = b""
self._shutdown_event.clear()
def _update_loop(self) -> None: def _update_loop(self) -> None:
while not self._shutdown_event.is_set(): while not self._shutdown_event.is_set():
try: try:
self._connection = requests.get(self.url, stream=True, timeout=5) self._connection = requests.get(self.url, stream=True)
for chunk in self._connection.iter_content(chunk_size=1024): for chunk in self._connection.iter_content(chunk_size=1024):
if self._shutdown_event.is_set():
break
self._buffer += chunk self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end end = self._buffer.find(b'\xff\xd9') # JPEG end
@@ -91,3 +50,16 @@ class WebcamViewer(Device):
except Exception as exc: except Exception as exc:
content = traceback.format_exc() content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}") logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -1,44 +1,74 @@
import numpy as np
from ophyd import Component as Cpt, Signal, EpicsSignal from ophyd import Component as Cpt
from ophyd import Device from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayAlignGUI(Device):
update_frame_acqdone = Cpt(Signal, value=0) class OMNYXRayEpicsGUI(Device):
update_frame_acq = Cpt(Signal, value=0)
enable_mv_x = Cpt(Signal, value=0)
enable_mv_y = Cpt(Signal, value=0)
send_message = Cpt(Signal, value=0)
sample_name = Cpt(Signal, value=0)
angle = Cpt(Signal, value=0)
pixel_size = Cpt(Signal, value=0)
submit = Cpt(EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0", auto_monitor=True)
step = Cpt(Signal, value=0)
recbg = Cpt(Signal, value=0)
mvx = Cpt(Signal, value=0)
mvy = Cpt(Signal, value=0)
fit_array = Cpt(Signal, value=np.zeros((3, 10))) save_frame = Cpt(
fit_params_x = Cpt(Signal, value=np.zeros((2, 3))) EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
fit_params_y = Cpt(Signal, value=np.zeros((2, 3))) )
update_frame_acqdone = Cpt(
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
)
step = Cpt(
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
)
xval_x_dynamic = {
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)
# Generate width_y_0 to width_y_10
for i in range(11):
locals()[f"width_y_{i}"] = Cpt(Signal, value=0)
# Generate width_x_0 to width_x_10
for i in range(11):
locals()[f"width_x_{i}"] = Cpt(Signal, value=0)
# Generate xval_x_0 to xval_x_10
for i in range(11):
locals()[f"xval_x_{i}"] = Cpt(Signal, value=0)
# Generate yval_y_0 to yval_y_10
for i in range(11):
locals()[f"yval_y_{i}"] = Cpt(Signal, value=0)
# Generate stage_pos_x_1 to stage_pos_x_5
for i in range(1, 6):
locals()[f"stage_pos_x_{i}"] = Cpt(Signal, value=0)

View File

@@ -54,7 +54,7 @@ def test_on_connected_sets_mask_and_live_mode(ids_camera):
def test_on_trigger_roi_signal(ids_camera): def test_on_trigger_roi_signal(ids_camera):
"""Test the on_trigger method to ensure it processes the ROI signal correctly.""" """Test the on_trigger method to ensure it processes the ROI signal correctly."""
ids_camera.start_live_mode() ids_camera.live_mode = True
test_image = np.array([[2, 4], [6, 8]]) test_image = np.array([[2, 4], [6, 8]])
test_mask = np.array([[1, 0], [0, 1]], dtype=np.uint8) test_mask = np.array([[1, 0], [0, 1]], dtype=np.uint8)
ids_camera.mask = test_mask ids_camera.mask = test_mask

View File

@@ -229,22 +229,6 @@ def device_manager_mock():
"kwargs": {}, "kwargs": {},
}, },
), ),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="lsamrot",
action="rpc",
parameter={
"device": "lsamrot",
"func": "readback.get",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
),
messages.DeviceInstructionMessage( messages.DeviceInstructionMessage(
metadata={ metadata={
"readout_priority": "monitored", "readout_priority": "monitored",