Compare commits

..

7 Commits

Author SHA1 Message Date
x12sa
ae0ad918a3 feat: check all devices are enabled, if not try to enable
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m55s
2026-03-10 14:50:25 +01:00
x01dc
2c28bba8c6 added account check at startup
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m52s
2026-03-09 13:19:58 +01:00
x01dc
a36e96aaba renamings and no bl check in x01 network
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m52s
2026-03-09 13:10:38 +01:00
x01dc
11896a3900 moved bl checker, tomo_id, tomo_reconstruct to separate, sharable classes
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m53s
2026-03-08 19:50:22 +01:00
x01dc
5f4db23014 added light command to lamni
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m51s
2026-03-06 14:51:51 +01:00
x01dc
8ee588d138 guitools
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m50s
2026-03-06 14:36:12 +01:00
x01dc
241cc68d39 WIP guitools initial version 2026-03-06 14:35:54 +01:00
6 changed files with 723 additions and 144 deletions

View File

@@ -0,0 +1,188 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class LamniGuiToolsError(Exception):
pass
class LamniGuiTools:
def __init__(self):
self.text_box = None
self.progressbar = None
def set_client(self, client):
self.client = client
self.gui = self.client.gui
def lamnigui_show_gui(self):
if "lamni" in self.gui.windows:
self.gui.lamni.show()
else:
self.gui.new("lamni")
def lamnigui_stop_gui(self):
self.gui.lamni.hide()
def lamnigui_raise(self):
self.gui.lamni.raise_window()
def lamnigui_show_xeyealign(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("xeyegui"):
self.lamnigui_remove_all_docks()
self.xeyegui = self.gui.lamni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
def _lamnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"lamni"):
if hasattr(self.gui.lamni,attribute_name):
return False
return True
def lamnigui_remove_all_docks(self):
self.gui.lamni.delete_all()
self.progressbar = None
self.text_box = None
def lamnigui_idle(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("idle_text_box"):
self.lamnigui_remove_all_docks()
idle_text_box = self.gui.lamni.new("idle_textbox").new("TextBox")
text = (
"<pre>"
+ "██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗\n"
+ "██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║\n"
+ "██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║\n"
+ "██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║\n"
+ "██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║\n"
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
def lamnigui_docs(self, filename: str | None = None):
import csaxs_bec
from pathlib import Path
print("The general lamni documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/lamni.html#user-ptychography-lamni")
csaxs_bec_basepath = Path(csaxs_bec.__file__).parent
docs_folder = (
csaxs_bec_basepath /
"bec_ipython_client" / "plugins" / "lamni" / "docs"
)
if not docs_folder.is_dir():
raise NotADirectoryError(f"Docs folder not found: {docs_folder}")
pdfs = sorted(docs_folder.glob("*.pdf"))
if not pdfs:
raise FileNotFoundError(f"No PDF files found in {docs_folder}")
# --- Resolve PDF ------------------------------------------------------
if filename is not None:
pdf_file = docs_folder / filename
if not pdf_file.exists():
raise FileNotFoundError(f"Requested file not found: {filename}")
else:
print("\nAvailable lamni documentation PDFs:\n")
for i, pdf in enumerate(pdfs, start=1):
print(f" {i:2d}) {pdf.name}")
print()
while True:
try:
choice = int(input(f"Select a file (1{len(pdfs)}): "))
if 1 <= choice <= len(pdfs):
pdf_file = pdfs[choice - 1]
break
print(f"Enter a number between 1 and {len(pdfs)}.")
except ValueError:
print("Invalid input. Please enter a number.")
# --- GUI handling (active existence check) ----------------------------
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("PdfViewerWidget"):
self.lamnigui_remove_all_docks()
self.pdf_viewer = self.gui.lamni.new(widget="PdfViewerWidget")
# --- Load PDF ---------------------------------------------------------
self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n")
def _lamnicam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def lamnigui_show_progress(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("progressbar"):
self.lamnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.lamni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.lamni.new(name="progress_text").new("TextBox")
self._lamnigui_update_progress()
def _lamnigui_update_progress(self):
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
self.progress["subtomo_projection"]
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
if __name__ == "__main__":
from bec_lib.client import BECClient
from bec_widgets.cli.client_utils import BECGuiClient
client = BECClient()
client.start()
client.gui = BECGuiClient()
lamni_gui = LamniGuiTools(client)
lamni_gui.lamnigui_show_gui()
lamni_gui.lamnigui_show_progress()

View File

@@ -2,7 +2,6 @@ import builtins
import datetime
import os
import subprocess
import threading
import time
from pathlib import Path
@@ -12,7 +11,13 @@ from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
BeamlineChecker,
OMNYTools,
PtychoReconstructor,
TomoIDManager,
)
from csaxs_bec.bec_ipython_client.plugins.LamNI.gui_tools import LamniGuiTools
from .alignment import XrayEyeAlign
from .lamni_optics_mixin import LaMNIInitStages, LamNIOpticsMixin
@@ -26,26 +31,26 @@ if builtins.__dict__.get("bec") is not None:
umvr = builtins.__dict__.get("umvr")
class LamNI(LamNIOpticsMixin):
class LamNI(LamNIOpticsMixin, LamniGuiTools):
def __init__(self, client):
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStages(client)
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
self._check_msgs = []
# Extracted collaborators
self.bl_chk = BeamlineChecker(client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
self.OMNYTools = OMNYTools(self.client)
self.tomo_id = -1
self.special_angles = []
self.special_angle_repeats = 20
self.special_angle_tolerance = 20
self._current_special_angles = []
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.OMNYTools = OMNYTools(self.client)
# Progress tracking
self.progress = {}
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
@@ -57,104 +62,19 @@ class LamNI(LamNIOpticsMixin):
self.progress["angle"] = 0
# ------------------------------------------------------------------
# Beamline checks
# Beamline checks — delegated to BeamlineChecker
# ------------------------------------------------------------------
def get_beamline_checks_enabled(self):
print(
f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:"
f" {self.check_light_available}"
)
@property
def beamline_checks_enabled(self):
return {
"shutter": self.check_shutter,
"fofb": self.check_fofb,
"light available": self.check_light_available,
}
return self.bl_chk.checks_enabled
@beamline_checks_enabled.setter
def beamline_checks_enabled(self, val: bool):
self.check_shutter = val
self.check_light_available = val
self.check_fofb = val
self.get_beamline_checks_enabled()
self.bl_chk.checks_enabled = val
def _run_beamline_checks(self):
msgs = []
dev = builtins.__dict__.get("dev")
try:
if self.check_shutter:
shutter_val = dev.x12sa_es1_shutter_status.read(cached=True)
if shutter_val["value"].lower() != "open":
self._beam_is_okay = False
msgs.append("Check beam failed: Shutter is closed.")
if self.check_light_available:
machine_status = dev.sls_machine_status.read(cached=True)
if machine_status["value"] not in ["Light Available", "Light-Available"]:
self._beam_is_okay = False
msgs.append("Check beam failed: Light not available.")
if self.check_fofb:
fast_orbit_feedback = dev.sls_fast_orbit_feedback.read(cached=True)
if fast_orbit_feedback["value"] != "running":
self._beam_is_okay = False
msgs.append("Check beam failed: Fast orbit feedback is not running.")
except Exception:
logger.warning("Failed to check beam.")
return msgs
def _check_beam(self):
while not self._stop_beam_check_event.is_set():
self._check_msgs = self._run_beamline_checks()
if not self._beam_is_okay:
self._stop_beam_check_event.set()
time.sleep(1)
def _start_beam_check(self):
self._beam_is_okay = True
self._stop_beam_check_event = threading.Event()
self.beam_check_thread = threading.Thread(target=self._check_beam, daemon=True)
self.beam_check_thread.start()
def _was_beam_okay(self):
self._stop_beam_check_event.set()
self.beam_check_thread.join()
return self._beam_is_okay
def _print_beamline_checks(self):
for msg in self._check_msgs:
logger.warning(msg)
def _wait_for_beamline_checks(self):
self._print_beamline_checks()
try:
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Beamline checks failed at"
f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)}</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
while True:
self._beam_is_okay = True
self._check_msgs = self._run_beamline_checks()
if self._beam_is_okay:
break
self._print_beamline_checks()
time.sleep(1)
try:
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Operation resumed at"
f" {str(datetime.datetime.now())}.</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
def get_beamline_checks_enabled(self):
self.bl_chk.print_status()
# ------------------------------------------------------------------
# Special angles
@@ -212,7 +132,6 @@ class LamNI(LamNIOpticsMixin):
def feedback_status(self):
self.device_manager.devices.rtx.controller.show_feedback_status()
def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_feedback_status()
@@ -222,6 +141,12 @@ class LamNI(LamNIOpticsMixin):
def show_analog_signals(self):
return self.device_manager.devices.rtx.controller.show_analog_signals()
def lights_off(self):
self.device_manager.devices.lsamx.controller.lights_off()
def lights_on(self):
self.device_manager.devices.lsamx.controller.lights_on()
# ------------------------------------------------------------------
# Global parameters (backed by BEC global vars)
# ------------------------------------------------------------------
@@ -371,6 +296,7 @@ class LamNI(LamNIOpticsMixin):
@ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val)
self.reconstructor.folder_name = val # keep reconstructor in sync
@property
def tomo_angle_stepsize(self):
@@ -491,23 +417,22 @@ class LamNI(LamNIOpticsMixin):
)
# ------------------------------------------------------------------
# Sample database
# Sample database — delegated to TomoIDManager in omny general tools
# ------------------------------------------------------------------
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
"""Add a sample to the OMNY sample database and retrieve the tomo id."""
subprocess.run(
"wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt"
f" 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}"
f"&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}"
f"&additional={sample_additional_info}&user={user}'",
shell=True,
return self.tomo_id_manager.register(
sample_name=samplename,
date=date,
eaccount=eaccount,
scan_number=scan_number,
setup=setup,
additional_info=sample_additional_info,
user=user,
)
with open("/tmp/currsamplesnr.txt") as tomo_number_file:
tomo_number = int(tomo_number_file.read())
return tomo_number
# ------------------------------------------------------------------
# Scan projection
@@ -524,7 +449,6 @@ class LamNI(LamNIOpticsMixin):
for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1):
for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1):
# pylint: disable=undefined-variable
self._current_scan_list.append(bec.queue.next_scan_number)
log_message = (
f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle},"
@@ -562,18 +486,11 @@ class LamNI(LamNIOpticsMixin):
def tomo_reconstruct(self, base_path="~/Data10/specES1"):
"""Write the tomo reconstruct file for the reconstruction queue."""
bec = builtins.__dict__.get("bec")
base_path = os.path.expanduser(base_path)
ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername))
ptycho_queue_path.mkdir(parents=True, exist_ok=True)
last_scan_number = bec.queue.next_scan_number - 1
ptycho_queue_file = os.path.abspath(
os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat")
self.reconstructor.write(
scan_list=self._current_scan_list,
next_scan_number=bec.queue.next_scan_number,
base_path=base_path,
)
with open(ptycho_queue_file, "w") as queue_file:
scans = " ".join([str(scan) for scan in self._current_scan_list])
queue_file.write(f"p.scan_number {scans}\n")
queue_file.write("p.check_nextscan_started 1\n")
def _at_each_angle(self, angle: float) -> None:
self.tomo_scan_projection(angle)
@@ -635,7 +552,7 @@ class LamNI(LamNIOpticsMixin):
print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
self._start_beam_check()
self.bl_chk.start()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
@@ -662,10 +579,10 @@ class LamNI(LamNIOpticsMixin):
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
if self._was_beam_okay() and not error_caught:
if self.bl_chk.stop() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
self.bl_chk.wait_until_recovered()
def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False):
"""Return the ii-th golden ratio angle within sorted bunches and its subtomo number."""
@@ -1009,5 +926,4 @@ class LamNI(LamNIOpticsMixin):
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
)
self.client.logbook.send_logbook_message(msg)
self.client.logbook.send_logbook_message(msg)

View File

@@ -51,7 +51,7 @@ class LaMNIInitStages:
self.drive_axis_to_limit(dev.lsamrot, "forward")
dev.lsamrot.enabled = False
print("Now hard reboot the controller and run the initialization routine again.")
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
print("Remark: The controller will be disabled in bec. It will be enabled by running the init route, \nbut in case needed, to enable manually set dev.lsamrot.enabled=True")
return
if self.OMNYTools.yesno(

View File

@@ -37,6 +37,92 @@ class cSAXSInitSmaractStages:
# ------------------------------
# Internal helpers (runtime-based)
# ------------------------------
def _ensure_all_session_devices_enabled(self, selection: set | None = None, try_enable: bool = True):
"""
Ensure all session devices (or a selection) that define 'bl_smar_stage' are enabled.
Parameters
----------
selection : set | None
If provided, only devices in this set are considered.
try_enable : bool
If True, attempt to set device.enabled = True for devices that expose 'enabled' and are False.
If False, only report status without changing it.
Returns
-------
dict
{
"enabled_now": [device_names enabled by this call],
"already_enabled": [device_names already enabled or without 'enabled' attr],
"failed": [device_names that could not be enabled],
"inaccessible": [device_names not accessible]
}
"""
enabled_now = []
already_enabled = []
failed = []
inaccessible = []
# Build axis map to restrict to SmarAct-based devices (same logic as other helpers)
axis_map = self._build_session_axis_map(selection=selection)
for dev_name in sorted(axis_map.keys()):
try:
d = self._get_device_object(dev_name)
if d is None:
inaccessible.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} not accessible.")
continue
# If device has no 'enabled' attribute, treat as already enabled/usable
if not hasattr(d, "enabled"):
already_enabled.append(dev_name)
continue
# If already enabled
try:
if getattr(d, "enabled"):
already_enabled.append(dev_name)
continue
except Exception:
# If reading enabled fails, treat as inaccessible for safety
failed.append(dev_name)
logger.warning(f"[cSAXS] Could not read 'enabled' for {dev_name}.")
continue
# Device exists and is disabled
if try_enable:
try:
logger.info(f"[cSAXS] Enabling device {dev_name} (was disabled).")
setattr(d, "enabled", True)
# small delay to let device initialize if needed
time.sleep(0.05)
if getattr(d, "enabled"):
enabled_now.append(dev_name)
logger.info(f"[cSAXS] Device {dev_name} enabled.")
else:
failed.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} still disabled after enabling attempt.")
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] Failed to enable {dev_name}: {exc}")
else:
# Not trying to enable, just report
failed.append(dev_name)
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] _ensure_all_session_devices_enabled error for {dev_name}: {exc}")
return {
"enabled_now": enabled_now,
"already_enabled": already_enabled,
"failed": failed,
"inaccessible": inaccessible,
}
def _yesno(self, question: str, default: str = "y") -> bool:
"""
Use OMNYTools.yesno if available; otherwise default to 'yes' (or fallback to input()).
@@ -107,6 +193,7 @@ class cSAXSInitSmaractStages:
# ------------------------------
# Public API
# ------------------------------
def smaract_reference_stages(self, force: bool = False, devices_to_reference=None):
"""
Reference SmarAct stages using runtime discovery.
@@ -167,6 +254,19 @@ class cSAXSInitSmaractStages:
devices_to_reference = [devices_to_reference]
selection = set(devices_to_reference) if devices_to_reference else None
# First: ensure all relevant devices are enabled before attempting referencing
enable_report = self._ensure_all_session_devices_enabled(selection=selection, try_enable=True)
if enable_report["failed"]:
logger.warning(
"[cSAXS] Some devices could not be enabled before referencing: "
+ ", ".join(sorted(enable_report["failed"]))
)
if enable_report["inaccessible"]:
logger.warning(
"[cSAXS] Some devices were inaccessible before referencing: "
+ ", ".join(sorted(enable_report["inaccessible"]))
)
# Build axis map for selected devices (or all devices present)
axis_map = self._build_session_axis_map(selection=selection)
if selection:
@@ -174,7 +274,6 @@ class cSAXSInitSmaractStages:
if unknown:
print(f"Unknown devices requested or missing 'bl_smar_stage' (ignored): {unknown}")
newly_referenced = []
already_referenced = []
failed = []
@@ -191,6 +290,17 @@ class cSAXSInitSmaractStages:
failed.append(dev_name)
continue
# If device exposes 'enabled' and is False, skip (we already tried enabling above)
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name}: device disabled, skipping.")
failed.append(dev_name)
continue
except Exception:
print(f"{dev_name}: could not read enabled state, skipping.")
failed.append(dev_name)
continue
try:
is_ref = d.controller.axis_is_referenced(ch)
@@ -246,7 +356,17 @@ class cSAXSInitSmaractStages:
def smaract_check_all_referenced(self):
"""
Check reference state for all SmarAct devices that define 'bl_smar_stage'.
This now enables all relevant devices first (attempt), then performs the checks.
"""
# Attempt to enable all relevant devices first (do not force enabling if you prefer)
enable_report = self._ensure_all_session_devices_enabled(selection=None, try_enable=True)
if enable_report["enabled_now"]:
print("Now enabled devices which were disabled before: " + ", ".join(sorted(enable_report["enabled_now"])))
if enable_report["failed"]:
print("Could not enable: " + ", ".join(sorted(enable_report["failed"])))
if enable_report["inaccessible"]:
print("Inaccessible: " + ", ".join(sorted(enable_report["inaccessible"])))
axis_map = self._build_session_axis_map(selection=None)
for dev_name in sorted(axis_map.keys()):
ch = axis_map[dev_name]
@@ -254,6 +374,16 @@ class cSAXSInitSmaractStages:
if d is None:
print(f"{dev_name}: device not accessible or unsupported.")
continue
# Skip devices that expose 'enabled' and are False
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name} (axis {ch}) is disabled; skipping reference check.")
continue
except Exception:
print(f"{dev_name} (axis {ch}) enabled-state unknown; skipping.")
continue
try:
if d.controller.axis_is_referenced(ch):
print(f"{dev_name} (axis {ch}) is referenced.")
@@ -262,6 +392,7 @@ class cSAXSInitSmaractStages:
except Exception as e:
print(f"Error checking {dev_name} (axis {ch}): {e}")
def smaract_components_to_initial_position(self, devices_to_move=None):
"""
Move selected (or all) SmarAct-based components to their configured init_position.

View File

@@ -1,23 +1,29 @@
import time
import numpy as np
import sys
import termios
import tty
import builtins
import datetime
import fcntl
import os
import builtins
import subprocess
import sys
import termios
import threading
import time
import tty
from pathlib import Path
import numpy as np
from bec_lib import bec_logger
from rich import box
from rich.console import Console
from rich.table import Table
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
@@ -110,24 +116,20 @@ class OMNYTools:
next1, next2 = sys.stdin.read(2)
if next1 == "[":
if next2 == "A":
# print("up")
if dev2 != "none":
umvr(dev2, step2)
if special_command != "none":
special_command()
elif next2 == "B":
# print(" down")
if dev2 != "none":
umvr(dev2, -step2)
if special_command != "none":
special_command()
elif next2 == "C":
# print("right")
umvr(dev1, step1)
if special_command != "none":
special_command()
elif next2 == "D":
# print("left")
umvr(dev1, -step1)
if special_command != "none":
special_command()
@@ -143,13 +145,324 @@ class OMNYTools:
step2 = step2 / 2
print(f"\rHalf step size. New step size: {step1}, {step2}\r")
except IOError:
# No input available, keep looping
pass
# Sleep for a short period to avoid high CPU usage
time.sleep(0.02)
finally:
# Restore the terminal to its original state
termios.tcsetattr(fd, termios.TCSADRAIN, old_term)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
import socket
class BeamlineChecker:
"""Monitors beamline health during scans.
Runs checks in a background thread and blocks scan progress
until beam conditions are restored if they fail.
Usage:
checker = BeamlineChecker(client)
checker._bl_chk_start()
# ... run scan ...
beam_was_ok = checker._bl_chk_stop()
if not beam_was_ok:
checker._bl_chk_wait_until_recovered()
"""
def __init__(self, client):
self.client = client
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
self._beam_is_okay = True
self._stop_event = None
self._thread = None
self._local_network_warned = False
self._check_msgs = []
# ------------------------------------------------------------------
# Public control interface
# ------------------------------------------------------------------
def bl_chk_status(self):
"""Print and return the current enabled/disabled state of all checks."""
if self._is_local_network():
print("Beamline checks cannot be performed on this network (129.129.98.x) — skipping.")
return {}
status = {
"shutter": self.check_shutter,
"fofb": self.check_fofb,
"light available": self.check_light_available,
}
print(
f"Shutter: {self.check_shutter}\n"
f"FOFB: {self.check_fofb}\n"
f"Light available: {self.check_light_available}"
)
return status
def bl_chk_enable_all(self):
"""Enable all beamline checks."""
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
self.bl_chk_status()
def bl_chk_disable_all(self):
"""Disable all beamline checks."""
self.check_shutter = False
self.check_light_available = False
self.check_fofb = False
self.bl_chk_status()
def bl_chk_enable_shutter(self):
"""Enable the shutter check."""
self.check_shutter = True
self.bl_chk_status()
def bl_chk_disable_shutter(self):
"""Disable the shutter check."""
self.check_shutter = False
self.bl_chk_status()
def bl_chk_enable_fofb(self):
"""Enable the fast orbit feedback check."""
self.check_fofb = True
self.bl_chk_status()
def bl_chk_disable_fofb(self):
"""Disable the fast orbit feedback check."""
self.check_fofb = False
self.bl_chk_status()
def bl_chk_enable_light(self):
"""Enable the light available check."""
self.check_light_available = True
self.bl_chk_status()
def bl_chk_disable_light(self):
"""Disable the light available check."""
self.check_light_available = False
self.bl_chk_status()
def _bl_chk_start(self):
"""Start the background beam check thread."""
self._beam_is_okay = True
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._poll, daemon=True)
self._thread.start()
def _bl_chk_stop(self) -> bool:
"""Stop the background thread and return whether beam was okay throughout."""
self._stop_event.set()
self._thread.join()
return self._beam_is_okay
def _bl_chk_wait_until_recovered(self):
"""Block until all beamline checks pass again, logging to SciLog."""
self._log_failure_to_scilog()
while True:
self._beam_is_okay = True
self._check_msgs = self._run_checks()
if self._beam_is_okay:
break
self._print_msgs()
time.sleep(1)
self._log_recovery_to_scilog()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _is_local_network(self) -> bool:
"""Return True if running on the 129.129.98.x subnet."""
try:
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
return ip.startswith("129.129.98.")
except Exception:
return False
def _run_checks(self) -> list:
if self._is_local_network():
if not self._local_network_warned:
print("Beamline checks cannot be performed on this network (129.129.98.x) — skipping.")
self._local_network_warned = True
return []
msgs = []
dev = builtins.__dict__.get("dev")
try:
if self.check_shutter:
val = dev.x12sa_es1_shutter_status.read(cached=True)
if val["value"].lower() != "open":
self._beam_is_okay = False
msgs.append("Check beam failed: Shutter is closed.")
if self.check_light_available:
val = dev.sls_machine_status.read(cached=True)
if val["value"] not in ["Light Available", "Light-Available"]:
self._beam_is_okay = False
msgs.append("Check beam failed: Light not available.")
if self.check_fofb:
val = dev.sls_fast_orbit_feedback.read(cached=True)
if val["value"] != "running":
self._beam_is_okay = False
msgs.append("Check beam failed: Fast orbit feedback is not running.")
except Exception:
logger.warning("Failed to check beam.")
return msgs
def _poll(self):
while not self._stop_event.is_set():
self._check_msgs = self._run_checks()
if not self._beam_is_okay:
self._stop_event.set()
time.sleep(1)
def _print_msgs(self):
for msg in self._check_msgs:
logger.warning(msg)
def _log_failure_to_scilog(self):
self._print_msgs()
try:
bec = builtins.__dict__.get("bec")
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Beamline checks failed at"
f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)}</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send beam failure update to SciLog.")
def _log_recovery_to_scilog(self):
try:
bec = builtins.__dict__.get("bec")
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Operation resumed at"
f" {str(datetime.datetime.now())}.</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send beam recovery update to SciLog.")
class PtychoReconstructor:
"""Writes ptychography reconstruction queue files after each scan projection.
An external reconstruction engine monitors the queue folder and picks
up .dat files as they are written.
Usage:
reconstructor = PtychoReconstructor(folder_name="reconstruction_queue")
reconstructor.write(
scan_list=[1023, 1024],
next_scan_number=1025,
base_path="~/data/raw",
)
"""
def __init__(self, folder_name: str = "reconstruction_queue"):
self.folder_name = folder_name
def _accounts_match(self) -> bool:
"""Check if bec.active_account matches the current system user (p vs e prefix)."""
try:
bec = builtins.__dict__.get("bec")
active = bec.active_account # e.g. "p23092"
system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092"
print(f"Active server account {active}, BEC client account {system_user}.")
return active[1:] == system_user[1:]
except Exception:
logger.warning("Failed to compare active account to system user.")
return False
def write(self, scan_list: list, next_scan_number: int, base_path: str = "~/data/raw/analysis/"):
"""Write a reconstruction queue file for the given scan list.
Args:
scan_list (list): Scan numbers belonging to this projection
(may contain multiple entries when stitching).
next_scan_number (int): The current next scan number, used to
name the queue file.
base_path (str): Root path under which the queue folder lives.
"""
base_path = os.path.expanduser(base_path)
queue_path = Path(os.path.join(base_path, self.folder_name))
queue_path.mkdir(parents=True, exist_ok=True)
last_scan_number = next_scan_number - 1
queue_file = os.path.abspath(
os.path.join(queue_path, f"scan_{last_scan_number:05d}.dat")
)
with open(queue_file, "w") as f:
scans = " ".join(str(s) for s in scan_list)
f.write(f"p.scan_number {scans}\n")
f.write("p.check_nextscan_started 1\n")
class TomoIDManager:
"""Registers a tomography measurement in the OMNY sample database
and returns its assigned tomo ID.
Usage:
id_manager = TomoIDManager()
tomo_id = id_manager.register(
sample_name="my_sample",
date="2024-03-08",
eaccount="e12345",
scan_number=1001,
setup="lamni",
additional_info="test info",
user="BEC",
)
"""
OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
OMNY_USER = "omny"
OMNY_PASSWORD = "samples"
TMP_FILE = "/tmp/currsamplesnr.txt"
def register(
self,
sample_name: str,
date: str,
eaccount: str,
scan_number: int,
setup: str,
additional_info: str,
user: str,
) -> int:
"""Register a new measurement and return the assigned tomo ID.
Args:
sample_name (str): Name of the sample.
date (str): Date string (e.g. "2024-03-08").
eaccount (str): E-account identifier.
scan_number (int): First scan number of the measurement.
setup (str): Setup name (e.g. "lamni").
additional_info (str): Any additional sample information.
user (str): User name.
Returns:
int: The tomo ID assigned by the OMNY database.
"""
url = (
f"{self.OMNY_URL}"
f"?sample={sample_name}"
f"&date={date}"
f"&eaccount={eaccount}"
f"&scannr={scan_number}"
f"&setup={setup}"
f"&additional={additional_info}"
f"&user={user}"
)
subprocess.run(
f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
f" -q -O {self.TMP_FILE} '{url}'",
shell=True,
)
with open(self.TMP_FILE) as f:
return int(f.read())

View File

@@ -111,3 +111,34 @@ bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS
bec._ip.prompts.session_name = _session_name
bec._ip.prompts.status = 1
# ACCOUNT MISMATCH CHECK
import os
def _check_account_mismatch():
try:
active = bec.active_account # e.g. "p23092"
system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092"
if active[1:] != system_user[1:]:
print(f"""
\033[91m\033[1m
██╗ ██╗ █████╗ ██████╗ ███╗ ██╗██╗███╗ ██╗ ██████╗
██║ ██║██╔══██╗██╔══██╗████╗ ██║██║████╗ ██║██╔════╝
██║ █╗ ██║███████║██████╔╝██╔██╗ ██║██║██╔██╗ ██║██║ ███╗
██║███╗██║██╔══██║██╔══██╗██║╚██╗██║██║██║╚██╗██║██║ ██║
╚███╔███╔╝██║ ██║██║ ██║██║ ╚████║██║██║ ╚████║╚██████╔╝
╚══╝╚══╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═══╝ ╚═════╝
ACCOUNT MISMATCH DETECTED!
BEC active account : {active}
System user : {system_user}
Data read and written by the BEC client does not match the data account!
Please verify you are logged in with the correct account.
\033[0m""")
except Exception:
logger.warning("Failed to verify account match.")
if _args.session.lower() == "lamni" or _args.session.lower() == "flomni" or _args.session.lower() == "omny":
_check_account_mismatch()