Compare commits

...

47 Commits

Author SHA1 Message Date
ef03e88ad2 fix(x_ray_eye): added 20s rpc timeout to most rpc calls
All checks were successful
CI for csaxs_bec / test (push) Successful in 2m0s
CI for csaxs_bec / test (pull_request) Successful in 1m58s
2026-03-12 13:32:27 +01:00
1829b76458 fix(gui_tools): flomni gui tools timeout optimization
All checks were successful
CI for csaxs_bec / test (push) Successful in 2m1s
CI for csaxs_bec / test (pull_request) Successful in 2m1s
2026-03-12 11:36:15 +01:00
x12sa
97d62f2f0b feat: check all devices are enabled, if not try to enable
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m59s
CI for csaxs_bec / test (push) Successful in 1m59s
2026-03-12 11:30:45 +01:00
x01dc
0ebaa3a42f added tool for capstop calibration
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m58s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-12 11:23:45 +01:00
x01dc
0a01dd4e36 removed blchk from flomni 2026-03-12 11:23:45 +01:00
x01dc
a28d9f0e20 added umvr for tweak 2026-03-12 11:23:45 +01:00
x01dc
4388ecc3b2 removed bl chk to replace by states 2026-03-12 11:23:45 +01:00
x01dc
349fa09a32 fixed syntax 2026-03-12 11:23:45 +01:00
x01dc
8298218913 added account check at startup 2026-03-12 11:23:45 +01:00
x01dc
1e510659a9 renamings and no bl check in x01 network 2026-03-12 11:23:45 +01:00
x01dc
a765373805 moved bl checker, tomo_id, tomo_reconstruct to separate, sharable classes 2026-03-12 11:23:44 +01:00
x01dc
0cad98da6d added light command to lamni 2026-03-12 11:23:44 +01:00
x01dc
6384d690e5 guitools 2026-03-12 11:23:44 +01:00
x01dc
c042a52730 WIP guitools initial version 2026-03-12 11:23:44 +01:00
x01dc
3490aca053 fixed ascii logo flomni and omny 2026-03-12 11:23:44 +01:00
315a32d9de refactor: commented code removed
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m57s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-11 22:05:01 +01:00
x12sa
deaa469ce1 fix(gui): adjustment to BW V3
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
CI for csaxs_bec / test (pull_request) Successful in 1m59s
2026-03-11 16:50:14 +01:00
x01dc
774fc0dc36 set fixed on the script 2026-03-11 16:50:14 +01:00
x01dc
8b732a5de6 colorbar changed to greys 2026-03-11 16:50:14 +01:00
x01dc
240fcba4ef feat(xray_eye): alignment gui and script adapted to not use epics gui device 2026-03-11 16:50:14 +01:00
058dbf5e5b fix(allied_vision_camera): fix looping logic
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m58s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-11 16:48:33 +01:00
7b882653ad fix(allied_vision_camera): transpose fix
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
CI for csaxs_bec / test (pull_request) Successful in 1m59s
2026-03-11 16:09:39 +01:00
be9938ddb7 fix(camera): unify the live mode on cameras 2026-03-11 16:09:39 +01:00
2a7b068cc6 fix(ids_camera): live mode signal 2026-03-10 09:40:11 +01:00
73d91617e9 feat(allied-vision-camera): Add allied vision camera integration 2026-03-10 09:40:11 +01:00
6873ef8287 test: fix lamni test
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-06 15:11:28 +01:00
x01dc
70fa96bd58 added beck startup
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m53s
CI for csaxs_bec / test (push) Failing after 1m55s
2026-03-06 14:39:34 +01:00
x01dc
5155ba9b77 now reading encoder values for axes with encoder 2026-03-06 14:39:34 +01:00
488156fd87 documentation file for the 30 nm FZPs
All checks were successful
CI for csaxs_bec / test (push) Successful in 2m0s
CI for csaxs_bec / test (pull_request) Successful in 1m56s
2026-03-06 13:11:35 +01:00
4721ec404b fix(mcs): remove info logs
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m54s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-04 09:13:55 +01:00
4d69f8f90f fix(bec_widgets): removed omny alignment old gui
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 21:00:14 +01:00
0f072a786e test: add tests for panda, fix tests for fermat scan
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 13:15:53 +01:00
05a1e3d8be refactor: cleanup docs and logs for most relevant devices 2026-03-02 13:15:53 +01:00
e9fd9084b8 refactor(csaxsdlpca200): cleanup docs 2026-03-02 13:15:53 +01:00
40ef387134 fix(panda): make complete asyncronous for PandaBoxOmny 2026-03-02 13:15:53 +01:00
x12sa
6ed84664f2 added docs for burst acquisition 2026-03-02 13:15:53 +01:00
x12sa
e5e3343da7 post startup script sessions for all setups, and required modifications 2026-03-02 13:15:53 +01:00
x12sa
c8866faccc logic for gain setting and readback for bpm amplifiers 2026-03-02 13:15:53 +01:00
x12sa
3b561c251c fix(config): remove panda test config 2026-03-02 13:15:53 +01:00
x12sa
bc187040ad refactor(panda-box): add pandaboxomny and refactor panda_box main integration 2026-03-02 13:15:53 +01:00
x12sa
efd27a27e8 fix(ferma-scan): fix flomni, lamni and omny fermat scans. add exp_time and frames_per_trigger 2026-03-02 13:15:53 +01:00
x12sa
7096ef3323 refactor(config): Update configs for bl_general and flomni 2026-03-02 13:15:53 +01:00
13378f24dd refactor(panda-box): refactor Pandabox, moving logic to base class 2026-03-02 13:15:53 +01:00
x01dc
f5b898ea1c feat: add panda box csaxs integration 2026-03-01 18:15:57 +01:00
3d62bea04b Update repo with template version v1.2.8
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m55s
CI for csaxs_bec / test (push) Successful in 1m54s
2026-02-27 16:25:22 +01:00
1518845d25 resolve merge conflicts 2026-02-27 16:25:22 +01:00
ff3b6686db Update repo with template version v1.2.7 2026-02-27 16:25:22 +01:00
55 changed files with 2668 additions and 1189 deletions

View File

@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.2.2
_commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: csaxs_bec

View File

@@ -28,7 +28,7 @@ on:
description: "Python version to use"
required: false
type: string
default: "3.11"
default: "3.12"
permissions:
pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
@@ -55,13 +67,6 @@ jobs:
- name: Checkout BEC Widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Install dependencies
shell: bash
run: |

View File

@@ -0,0 +1,62 @@
name: Create template upgrade PR for csaxs_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"

View File

@@ -1,20 +0,0 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "csaxs"
target: "csaxs_bec"
branch: $CHILD_PIPELINE_BRANCH
pages:
stage: Deploy
needs: []
variables:
TARGET_BRANCH: $CI_COMMIT_REF_NAME
rules:
- if: "$CI_COMMIT_TAG != null"
variables:
TARGET_BRANCH: $CI_COMMIT_TAG
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/csaxs_bec"'
script:
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/

View File

@@ -0,0 +1,188 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class LamniGuiToolsError(Exception):
pass
class LamniGuiTools:
def __init__(self):
self.text_box = None
self.progressbar = None
def set_client(self, client):
self.client = client
self.gui = self.client.gui
def lamnigui_show_gui(self):
if "lamni" in self.gui.windows:
self.gui.lamni.show()
else:
self.gui.new("lamni")
def lamnigui_stop_gui(self):
self.gui.lamni.hide()
def lamnigui_raise(self):
self.gui.lamni.raise_window()
def lamnigui_show_xeyealign(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("xeyegui"):
self.lamnigui_remove_all_docks()
self.xeyegui = self.gui.lamni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
def _lamnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"lamni"):
if hasattr(self.gui.lamni,attribute_name):
return False
return True
def lamnigui_remove_all_docks(self):
self.gui.lamni.delete_all()
self.progressbar = None
self.text_box = None
def lamnigui_idle(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("idle_text_box"):
self.lamnigui_remove_all_docks()
idle_text_box = self.gui.lamni.new("idle_textbox").new("TextBox")
text = (
"<pre>"
+ "██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗\n"
+ "██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║\n"
+ "██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║\n"
+ "██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║\n"
+ "██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║\n"
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
def lamnigui_docs(self, filename: str | None = None):
import csaxs_bec
from pathlib import Path
print("The general lamni documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/lamni.html#user-ptychography-lamni")
csaxs_bec_basepath = Path(csaxs_bec.__file__).parent
docs_folder = (
csaxs_bec_basepath /
"bec_ipython_client" / "plugins" / "lamni" / "docs"
)
if not docs_folder.is_dir():
raise NotADirectoryError(f"Docs folder not found: {docs_folder}")
pdfs = sorted(docs_folder.glob("*.pdf"))
if not pdfs:
raise FileNotFoundError(f"No PDF files found in {docs_folder}")
# --- Resolve PDF ------------------------------------------------------
if filename is not None:
pdf_file = docs_folder / filename
if not pdf_file.exists():
raise FileNotFoundError(f"Requested file not found: {filename}")
else:
print("\nAvailable lamni documentation PDFs:\n")
for i, pdf in enumerate(pdfs, start=1):
print(f" {i:2d}) {pdf.name}")
print()
while True:
try:
choice = int(input(f"Select a file (1{len(pdfs)}): "))
if 1 <= choice <= len(pdfs):
pdf_file = pdfs[choice - 1]
break
print(f"Enter a number between 1 and {len(pdfs)}.")
except ValueError:
print("Invalid input. Please enter a number.")
# --- GUI handling (active existence check) ----------------------------
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("PdfViewerWidget"):
self.lamnigui_remove_all_docks()
self.pdf_viewer = self.gui.lamni.new(widget="PdfViewerWidget")
# --- Load PDF ---------------------------------------------------------
self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n")
def _lamnicam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def lamnigui_show_progress(self):
self.lamnigui_show_gui()
if self._lamnigui_check_attribute_not_exists("progressbar"):
self.lamnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.lamni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.lamni.new(name="progress_text").new("TextBox")
self._lamnigui_update_progress()
def _lamnigui_update_progress(self):
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
self.progress["subtomo_projection"]
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
if __name__ == "__main__":
from bec_lib.client import BECClient
from bec_widgets.cli.client_utils import BECGuiClient
client = BECClient()
client.start()
client.gui = BECGuiClient()
lamni_gui = LamniGuiTools(client)
lamni_gui.lamnigui_show_gui()
lamni_gui.lamnigui_show_progress()

View File

@@ -2,7 +2,6 @@ import builtins
import datetime
import os
import subprocess
import threading
import time
from pathlib import Path
@@ -12,7 +11,12 @@ from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
OMNYTools,
PtychoReconstructor,
TomoIDManager,
)
from csaxs_bec.bec_ipython_client.plugins.LamNI.gui_tools import LamniGuiTools
from .alignment import XrayEyeAlign
from .lamni_optics_mixin import LaMNIInitStages, LamNIOpticsMixin
@@ -23,29 +27,29 @@ if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
mv = builtins.__dict__.get("mv")
umvr = builtins.__dict__.get("umvr")
class LamNI(LamNIOpticsMixin):
class LamNI(LamNIOpticsMixin, LamniGuiTools):
def __init__(self, client):
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStages(client)
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
self._check_msgs = []
# Extracted collaborators
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
self.OMNYTools = OMNYTools(self.client)
self.tomo_id = -1
self.special_angles = []
self.special_angle_repeats = 20
self.special_angle_tolerance = 20
self._current_special_angles = []
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.OMNYTools = OMNYTools(self.client)
# Progress tracking
self.progress = {}
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
@@ -56,105 +60,6 @@ class LamNI(LamNIOpticsMixin):
self.progress["total_projections"] = 1
self.progress["angle"] = 0
# ------------------------------------------------------------------
# Beamline checks
# ------------------------------------------------------------------
def get_beamline_checks_enabled(self):
print(
f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:"
f" {self.check_light_available}"
)
@property
def beamline_checks_enabled(self):
return {
"shutter": self.check_shutter,
"fofb": self.check_fofb,
"light available": self.check_light_available,
}
@beamline_checks_enabled.setter
def beamline_checks_enabled(self, val: bool):
self.check_shutter = val
self.check_light_available = val
self.check_fofb = val
self.get_beamline_checks_enabled()
def _run_beamline_checks(self):
msgs = []
dev = builtins.__dict__.get("dev")
try:
if self.check_shutter:
shutter_val = dev.x12sa_es1_shutter_status.read(cached=True)
if shutter_val["value"].lower() != "open":
self._beam_is_okay = False
msgs.append("Check beam failed: Shutter is closed.")
if self.check_light_available:
machine_status = dev.sls_machine_status.read(cached=True)
if machine_status["value"] not in ["Light Available", "Light-Available"]:
self._beam_is_okay = False
msgs.append("Check beam failed: Light not available.")
if self.check_fofb:
fast_orbit_feedback = dev.sls_fast_orbit_feedback.read(cached=True)
if fast_orbit_feedback["value"] != "running":
self._beam_is_okay = False
msgs.append("Check beam failed: Fast orbit feedback is not running.")
except Exception:
logger.warning("Failed to check beam.")
return msgs
def _check_beam(self):
while not self._stop_beam_check_event.is_set():
self._check_msgs = self._run_beamline_checks()
if not self._beam_is_okay:
self._stop_beam_check_event.set()
time.sleep(1)
def _start_beam_check(self):
self._beam_is_okay = True
self._stop_beam_check_event = threading.Event()
self.beam_check_thread = threading.Thread(target=self._check_beam, daemon=True)
self.beam_check_thread.start()
def _was_beam_okay(self):
self._stop_beam_check_event.set()
self.beam_check_thread.join()
return self._beam_is_okay
def _print_beamline_checks(self):
for msg in self._check_msgs:
logger.warning(msg)
def _wait_for_beamline_checks(self):
self._print_beamline_checks()
try:
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Beamline checks failed at"
f" {str(datetime.datetime.now())}: {''.join(self._check_msgs)}</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
while True:
self._beam_is_okay = True
self._check_msgs = self._run_beamline_checks()
if self._beam_is_okay:
break
self._print_beamline_checks()
time.sleep(1)
try:
msg = bec.logbook.LogbookMessage()
msg.add_text(
"<p><mark class='pen-red'><strong>Operation resumed at"
f" {str(datetime.datetime.now())}.</strong></mark></p>"
).add_tag(["BEC", "beam_check"])
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to send update to SciLog.")
# ------------------------------------------------------------------
# Special angles
@@ -212,7 +117,6 @@ class LamNI(LamNIOpticsMixin):
def feedback_status(self):
self.device_manager.devices.rtx.controller.show_feedback_status()
def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_feedback_status()
@@ -222,6 +126,12 @@ class LamNI(LamNIOpticsMixin):
def show_analog_signals(self):
return self.device_manager.devices.rtx.controller.show_analog_signals()
def lights_off(self):
self.device_manager.devices.lsamx.controller.lights_off()
def lights_on(self):
self.device_manager.devices.lsamx.controller.lights_on()
# ------------------------------------------------------------------
# Global parameters (backed by BEC global vars)
# ------------------------------------------------------------------
@@ -371,6 +281,7 @@ class LamNI(LamNIOpticsMixin):
@ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val)
self.reconstructor.folder_name = val # keep reconstructor in sync
@property
def tomo_angle_stepsize(self):
@@ -491,23 +402,22 @@ class LamNI(LamNIOpticsMixin):
)
# ------------------------------------------------------------------
# Sample database
# Sample database — delegated to TomoIDManager in omny general tools
# ------------------------------------------------------------------
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
"""Add a sample to the OMNY sample database and retrieve the tomo id."""
subprocess.run(
"wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt"
f" 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}"
f"&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}"
f"&additional={sample_additional_info}&user={user}'",
shell=True,
return self.tomo_id_manager.register(
sample_name=samplename,
date=date,
eaccount=eaccount,
scan_number=scan_number,
setup=setup,
additional_info=sample_additional_info,
user=user,
)
with open("/tmp/currsamplesnr.txt") as tomo_number_file:
tomo_number = int(tomo_number_file.read())
return tomo_number
# ------------------------------------------------------------------
# Scan projection
@@ -524,7 +434,6 @@ class LamNI(LamNIOpticsMixin):
for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1):
for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1):
# pylint: disable=undefined-variable
self._current_scan_list.append(bec.queue.next_scan_number)
log_message = (
f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle},"
@@ -562,18 +471,11 @@ class LamNI(LamNIOpticsMixin):
def tomo_reconstruct(self, base_path="~/Data10/specES1"):
"""Write the tomo reconstruct file for the reconstruction queue."""
bec = builtins.__dict__.get("bec")
base_path = os.path.expanduser(base_path)
ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername))
ptycho_queue_path.mkdir(parents=True, exist_ok=True)
last_scan_number = bec.queue.next_scan_number - 1
ptycho_queue_file = os.path.abspath(
os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat")
self.reconstructor.write(
scan_list=self._current_scan_list,
next_scan_number=bec.queue.next_scan_number,
base_path=base_path,
)
with open(ptycho_queue_file, "w") as queue_file:
scans = " ".join([str(scan) for scan in self._current_scan_list])
queue_file.write(f"p.scan_number {scans}\n")
queue_file.write("p.check_nextscan_started 1\n")
def _at_each_angle(self, angle: float) -> None:
self.tomo_scan_projection(angle)
@@ -635,7 +537,6 @@ class LamNI(LamNIOpticsMixin):
print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
@@ -662,10 +563,9 @@ class LamNI(LamNIOpticsMixin):
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
#todo here bl chk, if ok then successfull true
successful = True
def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False):
"""Return the ii-th golden ratio angle within sorted bunches and its subtomo number."""
@@ -1011,3 +911,39 @@ class LamNI(LamNIOpticsMixin):
)
self.client.logbook.send_logbook_message(msg)
def get_calibration_of_capstops_left_and_right(self):
import time
print("""
Manual on how to center the Piezo stage first.
To obtain the center voltages one can move in closed loop to the interferometer
vertically and observe the capacitive readback signal. Check the limits of the
travel, move to center and obtain the required centering voltage.
Example: At 0 deg, accessible rty -60 to 51. So the init was 5 microns off.
Then this routine here will provide data for the new capstop left and right.
""")
angle = 0
umv(dev.lsamrot,0)
print("Capstop right\nAngle, Voltage1, Voltage2")
mv(dev.lsamrot,361)
while angle <= 360:
angle = dev.lsamrot.readback.get()
voltage1=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[1]"))
voltage2=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[2]"))
if angle<360:
print(f"{angle},{voltage1},{voltage2}")
time.sleep(.3)
time.sleep(10)
print("\nCapstop left\nAngle, Voltage1, Voltage2")
mv(dev.lsamrot,-1)
while angle > 0:
angle = dev.lsamrot.readback.get()
voltage1=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[1]"))
voltage2=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[2]"))
if angle>0:
print(f"{angle},{voltage1},{voltage2}")
time.sleep(.3)
print("Finished")

View File

@@ -9,9 +9,11 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class LamNIInitError(Exception):
pass
@@ -49,7 +51,7 @@ class LaMNIInitStages:
self.drive_axis_to_limit(dev.lsamrot, "forward")
dev.lsamrot.enabled = False
print("Now hard reboot the controller and run the initialization routine again.")
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
print("Remark: The controller will be disabled in bec. It will be enabled by running the init route, \nbut in case needed, to enable manually set dev.lsamrot.enabled=True")
return
if self.OMNYTools.yesno(

View File

@@ -0,0 +1,442 @@
"""
csaxs_dlpca200.py
=================
BEC control script for FEMTO DLPCA-200 Variable Gain Low Noise Current Amplifiers
connected to Galil RIO digital outputs.
DLPCA-200 Remote Control (datasheet page 4)
-------------------------------------------
Sub-D pin -> function:
Pin 10 -> gain LSB (digital out channel, index 0 in bit-tuple)
Pin 11 -> gain MID (digital out channel, index 1 in bit-tuple)
Pin 12 -> gain MSB (digital out channel, index 2 in bit-tuple)
Pin 13 -> coupling LOW = AC, HIGH = DC
Pin 14 -> speed mode HIGH = low noise (Pin14=1), LOW = high speed (Pin14=0)
Gain truth table (MSB, MID, LSB):
0,0,0 -> low-noise: 1e3 high-speed: 1e5
0,0,1 -> low-noise: 1e4 high-speed: 1e6
0,1,0 -> low-noise: 1e5 high-speed: 1e7
0,1,1 -> low-noise: 1e6 high-speed: 1e8
1,0,0 -> low-noise: 1e7 high-speed: 1e9
1,0,1 -> low-noise: 1e8 high-speed: 1e10
1,1,0 -> low-noise: 1e9 high-speed: 1e11
Strategy: prefer low-noise mode (1e3-1e9). For 1e10 and 1e11,
automatically fall back to high-speed mode.
Device wiring example (galilrioesxbox):
bpm4: Pin10->ch0, Pin11->ch1, Pin12->ch2, Pin13->ch3, Pin14->ch4
bim: Pin10->ch6, Pin11->ch7, Pin12->ch8, Pin13->ch9, Pin14->ch10
Usage examples
--------------
csaxs_amp = cSAXSDLPCA200(client)
csaxs_amp.set_gain("bpm4", 1e7) # low-noise if possible
csaxs_amp.set_gain("bim", 1e10) # auto high-speed
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
csaxs_amp.info("bpm4") # print current settings
csaxs_amp.info_all() # print all configured amplifiers
"""
import builtins
from bec_lib import bec_logger
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
# ---------------------------------------------------------------------------
# Amplifier registry
# ---------------------------------------------------------------------------
# Each entry describes one DLPCA-200 amplifier connected to a Galil RIO.
#
# Keys inside "channels":
# gain_lsb -> digital output channel number wired to DLPCA-200 Pin 10
# gain_mid -> digital output channel number wired to DLPCA-200 Pin 11
# gain_msb -> digital output channel number wired to DLPCA-200 Pin 12
# coupling -> digital output channel number wired to DLPCA-200 Pin 13
# speed_mode -> digital output channel number wired to DLPCA-200 Pin 14
#
# To add a new amplifier, simply extend this dict.
# ---------------------------------------------------------------------------
DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"bpm4": {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3
"speed_mode": 4, # Pin 14 -> Galil ch4
},
},
"bim": {
"rio_device": "galilrioesxbox",
"description": "Beam Intensity Monitor current amplifier",
"channels": {
"gain_lsb": 6, # Pin 10 -> Galil ch6
"gain_mid": 7, # Pin 11 -> Galil ch7
"gain_msb": 8, # Pin 12 -> Galil ch8
"coupling": 9, # Pin 13 -> Galil ch9
"speed_mode": 10, # Pin 14 -> Galil ch10
},
},
}
# ---------------------------------------------------------------------------
# DLPCA-200 gain encoding tables
# ---------------------------------------------------------------------------
# (msb, mid, lsb) -> gain in V/A
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
# Inverse maps: gain -> (msb, mid, lsb, low_noise_flag)
# low_noise_flag: True = Pin14 HIGH, False = Pin14 LOW
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class cSAXSDLPCA200Error(Exception):
pass
class cSAXSDLPCA200:
"""
Control class for FEMTO DLPCA-200 current amplifiers connected via Galil RIO
digital outputs in a BEC environment.
Supports:
- Forward control: set_gain(), set_coupling()
- Readback reporting: info(), info_all(), read_settings()
- Robust error handling and logging following cSAXS conventions.
"""
TAG = "[DLPCA200]"
def __init__(self, client, config: dict | None = None) -> None:
"""
Parameters
----------
client : BEC client object (passed through for future use)
config : optional override for DLPCA200_AMPLIFIER_CONFIG.
Falls back to the module-level dict if not provided.
"""
self.client = client
self._config: dict[str, dict] = config if config is not None else DLPCA200_AMPLIFIER_CONFIG
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _require_dev(self) -> None:
if dev is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} BEC 'dev' namespace is not available in this session."
)
def _get_cfg(self, amp_name: str) -> dict:
"""Return config dict for a named amplifier, raising on unknown names."""
if amp_name not in self._config:
known = ", ".join(sorted(self._config.keys()))
raise cSAXSDLPCA200Error(f"{self.TAG} Unknown amplifier '{amp_name}'. Known: [{known}]")
return self._config[amp_name]
def _get_rio(self, amp_name: str):
"""Return the live RIO device object for a given amplifier."""
self._require_dev()
cfg = self._get_cfg(amp_name)
rio_name = cfg["rio_device"]
try:
rio = getattr(dev, rio_name)
except AttributeError:
raise cSAXSDLPCA200Error(f"{self.TAG} RIO device '{rio_name}' not found in BEC 'dev'.")
return rio
def _dout_get(self, rio, ch: int) -> int:
"""Read one digital output channel (returns 0 or 1)."""
attr = getattr(rio.digital_out, f"ch{ch}")
val = attr.get()
return int(val)
def _dout_set(self, rio, ch: int, value: bool) -> None:
"""Write one digital output channel (True=HIGH=1, False=LOW=0)."""
attr = getattr(rio.digital_out, f"ch{ch}")
attr.set(value)
def _read_gain_bits(self, amp_name: str) -> tuple[int, int, int, int]:
"""
Read current gain bit-state from hardware.
Returns
-------
(msb, mid, lsb, speed_mode)
speed_mode: 1 = low-noise (Pin14=HIGH), 0 = high-speed (Pin14=LOW)
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
return msb, mid, lsb, speed_mode
def _decode_gain(self, msb: int, mid: int, lsb: int, speed_mode: int) -> int | None:
"""
Decode hardware bit-state into gain value (V/A).
speed_mode=1 -> low-noise table, speed_mode=0 -> high-speed table.
Returns None if the bit combination is not in the table.
"""
bits = (msb, mid, lsb)
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
# ------------------------------------------------------------------
# Public API - control
# ------------------------------------------------------------------
def set_gain(self, amp_name: str, gain: float, force_high_speed: bool = False) -> None:
"""
Set the transimpedance gain of a DLPCA-200 amplifier.
The method automatically selects low-noise mode (Pin14=HIGH) whenever
the requested gain is achievable in low-noise mode (1e3 - 1e9 V/A).
For gains of 1e10 and 1e11 V/A, high-speed mode is used automatically.
Parameters
----------
amp_name : str
Amplifier name as defined in DLPCA200_AMPLIFIER_CONFIG (e.g. "bpm4").
gain : float or int
Target gain in V/A. Must be one of:
1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11.
force_high_speed : bool, optional
If True, force high-speed (low-noise=False) mode even for gains
below 1e10. Default: False (prefer low-noise).
Examples
--------
csaxs_amp.set_gain("bpm4", 1e7) # low-noise mode (automatic)
csaxs_amp.set_gain("bim", 1e10) # high-speed mode (automatic)
csaxs_amp.set_gain("bpm4", 1e7, force_high_speed=True) # override to high-speed
"""
gain_int = int(gain)
if gain_int not in _GAIN_TO_BITS:
valid_str = ", ".join(
f"1e{int(round(__import__('math').log10(g)))}" for g in VALID_GAINS
)
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid gain {gain:.2e} V/A for '{amp_name}'. "
f"Valid values: {valid_str}"
)
msb, mid, lsb, low_noise_preferred = _GAIN_TO_BITS[gain_int]
# Apply force_high_speed override
if force_high_speed and low_noise_preferred:
# Check if this gain is achievable in high-speed mode
hs_entry = next(
(bits for bits, g in _GAIN_BITS_HIGH_SPEED.items() if g == gain_int), None
)
if hs_entry is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} Gain {gain:.2e} V/A is not achievable in high-speed mode "
f"for '{amp_name}'."
)
msb, mid, lsb = hs_entry
low_noise_preferred = False
use_low_noise = low_noise_preferred and not force_high_speed
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["gain_msb"], bool(msb))
self._dout_set(rio, ch["gain_mid"], bool(mid))
self._dout_set(rio, ch["gain_lsb"], bool(lsb))
self._dout_set(rio, ch["speed_mode"], use_low_noise) # True=low-noise
mode_str = "low-noise" if use_low_noise else "high-speed"
logger.info(
f"{self.TAG} [{amp_name}] gain set to {gain_int:.2e} V/A "
f"({mode_str} mode, bits MSB={msb} MID={mid} LSB={lsb})"
)
print(
f"{amp_name}: gain -> {gain_int:.2e} V/A [{mode_str}] "
f"(bits: MSB={msb} MID={mid} LSB={lsb})"
)
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set gain on '{amp_name}': {exc}"
) from exc
def set_coupling(self, amp_name: str, coupling: str) -> None:
"""
Set AC or DC coupling on a DLPCA-200 amplifier.
Parameters
----------
amp_name : str
Amplifier name (e.g. "bpm4", "bim").
coupling : str
"AC" or "DC" (case-insensitive).
DC -> Pin13 HIGH, AC -> Pin13 LOW.
Examples
--------
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
"""
coupling_upper = coupling.strip().upper()
if coupling_upper not in ("AC", "DC"):
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid coupling '{coupling}' for '{amp_name}'. " f"Use 'AC' or 'DC'."
)
pin13_high = coupling_upper == "DC"
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["coupling"], pin13_high)
logger.info(f"{self.TAG} [{amp_name}] coupling set to {coupling_upper}")
print(f"{amp_name}: coupling -> {coupling_upper}")
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set coupling on '{amp_name}': {exc}"
) from exc
# ------------------------------------------------------------------
# Public API - readback / reporting
# ------------------------------------------------------------------
def read_settings(self, amp_name: str) -> dict:
"""
Read back the current settings from hardware digital outputs.
Returns
-------
dict with keys:
"amp_name" : str
"gain" : int or None - gain in V/A (None if unknown bit pattern)
"mode" : str - "low-noise" or "high-speed"
"coupling" : str - "AC" or "DC"
"bits" : dict - raw bit values {msb, mid, lsb, speed_mode, coupling}
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
coupling_bit = self._dout_get(rio, ch["coupling"])
gain = self._decode_gain(msb, mid, lsb, speed_mode)
mode = "low-noise" if speed_mode else "high-speed"
coupling = "DC" if coupling_bit else "AC"
return {
"amp_name": amp_name,
"gain": gain,
"mode": mode,
"coupling": coupling,
"bits": {
"msb": msb,
"mid": mid,
"lsb": lsb,
"speed_mode": speed_mode,
"coupling": coupling_bit,
},
}
def info(self, amp_name: str) -> None:
"""
Print a plain summary of the current settings for one amplifier.
Example output
--------------
Amplifier : bpm4
Description : Beam Position Monitor 4 current amplifier
RIO device : galilrioesxbox
Gain : 1.00e+07 V/A
Mode : low-noise
Coupling : DC
Raw bits : MSB=1 MID=0 LSB=0 speed=1 coup=1
"""
cfg = self._get_cfg(amp_name)
try:
s = self.read_settings(amp_name)
except Exception as exc:
print(f"{self.TAG} [{amp_name}] Could not read settings: {exc}")
return
gain_str = (
f"{s['gain']:.2e} V/A" if s["gain"] is not None else "UNKNOWN (invalid bit pattern)"
)
bits = s["bits"]
print(f" {'Amplifier':<12}: {amp_name}")
print(f" {'Description':<12}: {cfg.get('description', '')}")
print(f" {'RIO device':<12}: {cfg['rio_device']}")
print(f" {'Gain':<12}: {gain_str}")
print(f" {'Mode':<12}: {s['mode']}")
print(f" {'Coupling':<12}: {s['coupling']}")
print(
f" {'Raw bits':<12}: MSB={bits['msb']} MID={bits['mid']} LSB={bits['lsb']} speed={bits['speed_mode']} coup={bits['coupling']}"
)
def info_all(self) -> None:
"""
Print a plain summary for ALL configured amplifiers.
"""
print("\nDLPCA-200 Amplifier Status Report")
print("-" * 40)
for amp_name in sorted(self._config.keys()):
self.info(amp_name)
print()
def list_amplifiers(self) -> list[str]:
"""Return sorted list of configured amplifier names."""
return sorted(self._config.keys())

View File

@@ -41,8 +41,10 @@ import builtins
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSFilterTransmission:
"""

View File

@@ -8,11 +8,14 @@ from bec_lib import bec_logger
logger = bec_logger.logger
# Pull BEC globals if present
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSInitSmaractStagesError(Exception):
pass
@@ -34,6 +37,92 @@ class cSAXSInitSmaractStages:
# ------------------------------
# Internal helpers (runtime-based)
# ------------------------------
def _ensure_all_session_devices_enabled(self, selection: set | None = None, try_enable: bool = True):
"""
Ensure all session devices (or a selection) that define 'bl_smar_stage' are enabled.
Parameters
----------
selection : set | None
If provided, only devices in this set are considered.
try_enable : bool
If True, attempt to set device.enabled = True for devices that expose 'enabled' and are False.
If False, only report status without changing it.
Returns
-------
dict
{
"enabled_now": [device_names enabled by this call],
"already_enabled": [device_names already enabled or without 'enabled' attr],
"failed": [device_names that could not be enabled],
"inaccessible": [device_names not accessible]
}
"""
enabled_now = []
already_enabled = []
failed = []
inaccessible = []
# Build axis map to restrict to SmarAct-based devices (same logic as other helpers)
axis_map = self._build_session_axis_map(selection=selection)
for dev_name in sorted(axis_map.keys()):
try:
d = self._get_device_object(dev_name)
if d is None:
inaccessible.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} not accessible.")
continue
# If device has no 'enabled' attribute, treat as already enabled/usable
if not hasattr(d, "enabled"):
already_enabled.append(dev_name)
continue
# If already enabled
try:
if getattr(d, "enabled"):
already_enabled.append(dev_name)
continue
except Exception:
# If reading enabled fails, treat as inaccessible for safety
failed.append(dev_name)
logger.warning(f"[cSAXS] Could not read 'enabled' for {dev_name}.")
continue
# Device exists and is disabled
if try_enable:
try:
logger.info(f"[cSAXS] Enabling device {dev_name} (was disabled).")
setattr(d, "enabled", True)
# small delay to let device initialize if needed
time.sleep(0.05)
if getattr(d, "enabled"):
enabled_now.append(dev_name)
logger.info(f"[cSAXS] Device {dev_name} enabled.")
else:
failed.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} still disabled after enabling attempt.")
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] Failed to enable {dev_name}: {exc}")
else:
# Not trying to enable, just report
failed.append(dev_name)
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] _ensure_all_session_devices_enabled error for {dev_name}: {exc}")
return {
"enabled_now": enabled_now,
"already_enabled": already_enabled,
"failed": failed,
"inaccessible": inaccessible,
}
def _yesno(self, question: str, default: str = "y") -> bool:
"""
Use OMNYTools.yesno if available; otherwise default to 'yes' (or fallback to input()).
@@ -104,6 +193,7 @@ class cSAXSInitSmaractStages:
# ------------------------------
# Public API
# ------------------------------
def smaract_reference_stages(self, force: bool = False, devices_to_reference=None):
"""
Reference SmarAct stages using runtime discovery.
@@ -164,6 +254,19 @@ class cSAXSInitSmaractStages:
devices_to_reference = [devices_to_reference]
selection = set(devices_to_reference) if devices_to_reference else None
# First: ensure all relevant devices are enabled before attempting referencing
enable_report = self._ensure_all_session_devices_enabled(selection=selection, try_enable=True)
if enable_report["failed"]:
logger.warning(
"[cSAXS] Some devices could not be enabled before referencing: "
+ ", ".join(sorted(enable_report["failed"]))
)
if enable_report["inaccessible"]:
logger.warning(
"[cSAXS] Some devices were inaccessible before referencing: "
+ ", ".join(sorted(enable_report["inaccessible"]))
)
# Build axis map for selected devices (or all devices present)
axis_map = self._build_session_axis_map(selection=selection)
if selection:
@@ -171,7 +274,6 @@ class cSAXSInitSmaractStages:
if unknown:
print(f"Unknown devices requested or missing 'bl_smar_stage' (ignored): {unknown}")
newly_referenced = []
already_referenced = []
failed = []
@@ -188,6 +290,17 @@ class cSAXSInitSmaractStages:
failed.append(dev_name)
continue
# If device exposes 'enabled' and is False, skip (we already tried enabling above)
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name}: device disabled, skipping.")
failed.append(dev_name)
continue
except Exception:
print(f"{dev_name}: could not read enabled state, skipping.")
failed.append(dev_name)
continue
try:
is_ref = d.controller.axis_is_referenced(ch)
@@ -243,7 +356,17 @@ class cSAXSInitSmaractStages:
def smaract_check_all_referenced(self):
"""
Check reference state for all SmarAct devices that define 'bl_smar_stage'.
This now enables all relevant devices first (attempt), then performs the checks.
"""
# Attempt to enable all relevant devices first (do not force enabling if you prefer)
enable_report = self._ensure_all_session_devices_enabled(selection=None, try_enable=True)
if enable_report["enabled_now"]:
print("Now enabled devices which were disabled before: " + ", ".join(sorted(enable_report["enabled_now"])))
if enable_report["failed"]:
print("Could not enable: " + ", ".join(sorted(enable_report["failed"])))
if enable_report["inaccessible"]:
print("Inaccessible: " + ", ".join(sorted(enable_report["inaccessible"])))
axis_map = self._build_session_axis_map(selection=None)
for dev_name in sorted(axis_map.keys()):
ch = axis_map[dev_name]
@@ -251,6 +374,16 @@ class cSAXSInitSmaractStages:
if d is None:
print(f"{dev_name}: device not accessible or unsupported.")
continue
# Skip devices that expose 'enabled' and are False
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name} (axis {ch}) is disabled; skipping reference check.")
continue
except Exception:
print(f"{dev_name} (axis {ch}) enabled-state unknown; skipping.")
continue
try:
if d.controller.axis_is_referenced(ch):
print(f"{dev_name} (axis {ch}) is referenced.")
@@ -259,6 +392,7 @@ class cSAXSInitSmaractStages:
except Exception as e:
print(f"Error checking {dev_name} (axis {ch}): {e}")
def smaract_components_to_initial_position(self, devices_to_move=None):
"""
Move selected (or all) SmarAct-based components to their configured init_position.
@@ -383,7 +517,6 @@ class cSAXSInitSmaractStages:
if not self._yesno("Proceed with the motions listed above?", "y"):
logger.info("[cSAXS] Motion to initial position aborted by user.")
return
# --- Execution phase (SIMULTANEOUS MOTION) ---
if umv is None:
logger.error("[cSAXS] 'umv' is not available in this session.")

View File

@@ -15,15 +15,21 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
OMNYTools,
PtychoReconstructor,
TomoIDManager,
)
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class FlomniToolsError(Exception):
@@ -63,9 +69,6 @@ class FlomniError(Exception):
# print("Please expicitely confirm y or n.")
class FlomniInitStagesMixin:
def flomni_init_stages(self):
@@ -973,36 +976,49 @@ class FlomniAlignmentMixin:
use_vertical_default_values=True,
):
"""
Read the alignment offset from the given directory and set the global parameter
tomo_alignment_fit.
Read the alignment parameters from xray eye fit.
Args:
dir_path (str, optional): The directory to read the alignment offset from. Defaults to os.path.expanduser("~/Data10/specES1/internal/").
"""
tomo_alignment_fit = np.zeros((2, 5))
with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file:
tomo_alignment_fit[0][0] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file:
# tomo_alignment_fit[0][0] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file:
tomo_alignment_fit[0][1] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file:
# tomo_alignment_fit[0][1] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file:
tomo_alignment_fit[0][2] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file:
# tomo_alignment_fit[0][2] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file:
tomo_alignment_fit[1][0] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file:
# tomo_alignment_fit[1][0] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file:
tomo_alignment_fit[1][1] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file:
# tomo_alignment_fit[1][1] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file:
tomo_alignment_fit[1][2] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file:
# tomo_alignment_fit[1][2] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Ay3.txt"), "r") as file:
tomo_alignment_fit[1][3] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Ay3.txt"), "r") as file:
# tomo_alignment_fit[1][3] = file.readline()
# with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file:
# tomo_alignment_fit[1][4] = file.readline()
params = dev.omny_xray_gui.fit_params_x.get()
#amplitude
tomo_alignment_fit[0][0] = params['SineModel_0_amplitude']
#phase
tomo_alignment_fit[0][1] = params['SineModel_0_shift']
#offset
tomo_alignment_fit[0][2] = params['LinearModel_1_intercept']
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 0
tomo_alignment_fit[1][4] = 0
with open(os.path.join(dir_path, "ptychotomoalign_Cy3.txt"), "r") as file:
tomo_alignment_fit[1][4] = file.readline()
print("New alignment parameters loaded:")
print(
@@ -1157,18 +1173,12 @@ class Flomni(
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.check_shutter = False
self.check_light_available = False
self.check_fofb = False
self._check_msgs = []
self.tomo_id = -1
self.special_angles = []
self.special_angle_repeats = 20
self.special_angle_tolerance = 20
self._current_special_angles = []
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.corr_pos_y = []
self.corr_angle_y = []
self.corr_pos_y_2 = []
@@ -1182,6 +1192,8 @@ class Flomni(
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
self.align = XrayEyeAlign(self.client, self)
self.set_client(client)
@@ -1213,27 +1225,6 @@ class Flomni(
def axis_id_to_numeric(self, axis_id) -> int:
return ord(axis_id.lower()) - 97
def get_beamline_checks_enabled(self):
print(
f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:"
f" {self.check_light_available}"
)
@property
def beamline_checks_enabled(self):
return {
"shutter": self.check_shutter,
"fofb": self.check_fofb,
"light available": self.check_light_available,
}
@beamline_checks_enabled.setter
def beamline_checks_enabled(self, val: bool):
self.check_shutter = val
self.check_light_available = val
self.check_fofb = val
self.get_beamline_checks_enabled()
def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5):
"""Set the special angles for a tomo
@@ -1377,6 +1368,7 @@ class Flomni(
@ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val)
self.reconstructor.folder_name = val # keep reconstructor in sync
@property
def tomo_angle_stepsize(self):
@@ -1484,7 +1476,6 @@ class Flomni(
if 0 <= angle < 180.05:
print(f"Starting flOMNI scan for angle {angle}")
while not successful:
self._start_beam_check()
try:
start_scan_number = bec.queue.next_scan_number
self.tomo_scan_projection(angle)
@@ -1497,11 +1488,9 @@ class Flomni(
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
#todo here was if blchk success, then setting to success true
successful = True
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, 0)
@@ -1590,7 +1579,7 @@ class Flomni(
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
self._start_beam_check()
self.bl_chk._bl_chk_start()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
@@ -1613,10 +1602,10 @@ class Flomni(
else:
raise exc
if self._was_beam_okay() and not error_caught:
if self.bl_chk._bl_chk_stop() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
self.bl_chk._bl_chk_wait_until_recovered()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
@@ -1768,13 +1757,15 @@ class Flomni(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
"""Add a sample to the omny sample database. This also retrieves the tomo id."""
subprocess.run(
f"wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}&additional={sample_additional_info}&user={user}'",
shell=True,
return self.tomo_id_manager.register(
sample_name=samplename,
date=date,
eaccount=eaccount,
scan_number=scan_number,
setup=setup,
additional_info=sample_additional_info,
user=user,
)
with open("/tmp/currsamplesnr.txt") as tomo_number_file:
tomo_number = int(tomo_number_file.read())
return tomo_number
def _at_each_angle(self, angle: float) -> None:
if "flomni_at_each_angle" in builtins.__dict__:
@@ -1836,19 +1827,11 @@ class Flomni(
def tomo_reconstruct(self, base_path="~/Data10/specES1"):
"""write the tomo reconstruct file for the reconstruction queue"""
bec = builtins.__dict__.get("bec")
base_path = os.path.expanduser(base_path)
ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername))
ptycho_queue_path.mkdir(parents=True, exist_ok=True)
# pylint: disable=undefined-variable
last_scan_number = bec.queue.next_scan_number - 1
ptycho_queue_file = os.path.abspath(
os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat")
self.reconstructor.write(
scan_list=self._current_scan_list,
next_scan_number=bec.queue.next_scan_number,
base_path=base_path,
)
with open(ptycho_queue_file, "w") as queue_file:
scans = " ".join([str(scan) for scan in self._current_scan_list])
queue_file.write(f"p.scan_number {scans}\n")
queue_file.write("p.check_nextscan_started 1\n")
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
@@ -2099,4 +2082,4 @@ if __name__ == "__main__":
builtins.__dict__["bec"] = bec
builtins.__dict__["umv"] = umv
flomni = Flomni(bec)
flomni.start_x_ray_eye_alignment()
flomni.start_x_ray_eye_alignment()

View File

@@ -1,14 +1,15 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class flomniGuiToolsError(Exception):
@@ -16,10 +17,17 @@ class flomniGuiToolsError(Exception):
class flomniGuiTools:
GUI_RPC_TIMEOUT = 20
def __init__(self):
self.text_box = None
self.progressbar = None
self.flomni_window = None
self.xeyegui = None
self.pdf_viewer = None
self.idle_text_box = None
self.camera_gripper_image = None
self.camera_overview_image = None
def set_client(self, client):
self.client = client
@@ -27,9 +35,10 @@ class flomniGuiTools:
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.gui.flomni.show()
self.flomni_window = self.gui.windows["flomni"]
self.gui.flomni.raise_window()
else:
self.gui.new("flomni")
self.flomni_window = self.gui.new("flomni", timeout=self.GUI_RPC_TIMEOUT)
def flomnigui_stop_gui(self):
self.gui.flomni.hide()
@@ -37,94 +46,111 @@ class flomniGuiTools:
def flomnigui_raise(self):
self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
if self._flomnigui_is_missing("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
self.xeyegui = self.gui.flomni.new(
"XRayEye", object_name="xrayeye", timeout=self.GUI_RPC_TIMEOUT
)
# start live
if not dev.cam_xeye.live_mode_enabled.get():
dev.cam_xeye.live_mode_enabled.put(True)
self.xeyegui.switch_tab("alignment")
def flomnigui_show_xeyealign_fittab(self):
self.flomnigui_show_gui()
if self._flomnigui_is_missing("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new(
"XRayEye", object_name="xrayeye", timeout=self.GUI_RPC_TIMEOUT
)
self.xeyegui.switch_tab("fit")
def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"flomni"):
if hasattr(self.gui.flomni,attribute_name):
if hasattr(self.gui, "flomni"):
if hasattr(self.gui.flomni, attribute_name):
return False
return True
def _flomnigui_is_missing(self, attribute_name):
widget = getattr(self, attribute_name, None)
if widget is None:
return True
if hasattr(widget, "_is_deleted") and widget._is_deleted():
return True
return False
def flomnigui_show_cameras(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
if self._flomnigui_is_missing("camera_gripper_image") or self._flomnigui_is_missing(
"camera_overview_image"
):
self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
self.camera_gripper_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True
camera_gripper_image.enable_fps_monitor = True
camera_gripper_image.enable_toolbar = False
camera_gripper_image.outer_axes = False
camera_gripper_image.inner_axes = False
self.camera_gripper_image.image(device="cam_flomni_gripper", signal="preview")
self.camera_gripper_image.lock_aspect_ratio = True
self.camera_gripper_image.enable_fps_monitor = True
self.camera_gripper_image.enable_toolbar = False
self.camera_gripper_image.outer_axes = False
self.camera_gripper_image.inner_axes = False
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
self.camera_overview_image = self.gui.flomni.new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True
camera_overview_image.enable_fps_monitor = True
camera_overview_image.enable_toolbar = False
camera_overview_image.outer_axes = False
camera_overview_image.inner_axes = False
self.camera_overview_image.image(device="cam_flomni_overview", signal="preview")
self.camera_overview_image.lock_aspect_ratio = True
self.camera_overview_image.enable_fps_monitor = True
self.camera_overview_image.enable_toolbar = False
self.camera_overview_image.outer_axes = False
self.camera_overview_image.inner_axes = False
dev.cam_flomni_overview.start_live_mode()
else:
print("Cannot open camera_overview. Device does not exist.")
def flomnigui_remove_all_docks(self):
#dev.cam_flomni_overview.stop_live_mode()
#dev.cam_flomni_gripper.stop_live_mode()
#dev.cam_xeye.live_mode = False
self.gui.flomni.delete_all()
# dev.cam_flomni_overview.stop_live_mode()
# dev.cam_flomni_gripper.stop_live_mode()
# dev.cam_xeye.live_mode = False
if "flomni" in self.gui.windows:
self.gui.flomni.delete_all(timeout=self.GUI_RPC_TIMEOUT)
self.progressbar = None
self.text_box = None
self.xeyegui = None
self.pdf_viewer = None
self.idle_text_box = None
self.camera_gripper_image = None
self.camera_overview_image = None
def flomnigui_idle(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
if self._flomnigui_is_missing("idle_text_box"):
self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
self.idle_text_box = self.gui.flomni.new("TextBox")
text = (
"<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n"
+ "| `-,| || | | || |'.'| || |' ' || | \n"
+ "| .-'| |' '-' '| | | || | ` || | \n"
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n"
+ "██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗\n"
+ "██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║\n"
+ "██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║\n"
+ "██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║\n"
+ "██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║\n"
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
self.idle_text_box.set_html_text(text)
def flomnigui_docs(self, filename: str | None = None):
import csaxs_bec
from pathlib import Path
print("The general flOMNI documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/flomni.html#user-ptychography-flomni")
print(
"The general flOMNI documentation is at \nhttps://sls-csaxs.readthedocs.io/en/latest/user/ptychography/flomni.html#user-ptychography-flomni"
)
csaxs_bec_basepath = Path(csaxs_bec.__file__).parent
docs_folder = (
csaxs_bec_basepath /
"bec_ipython_client" / "plugins" / "flomni" / "docs"
)
docs_folder = csaxs_bec_basepath / "bec_ipython_client" / "plugins" / "flomni" / "docs"
if not docs_folder.is_dir():
raise NotADirectoryError(f"Docs folder not found: {docs_folder}")
@@ -157,15 +183,14 @@ class flomniGuiTools:
# --- GUI handling (active existence check) ----------------------------
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("PdfViewerWidget"):
if self._flomnigui_is_missing("pdf_viewer"):
self.flomnigui_remove_all_docks()
self.pdf_viewer = self.gui.flomni.new(widget="PdfViewerWidget")
# --- Load PDF ---------------------------------------------------------
self.pdf_viewer.PdfViewerWidget.load_pdf(str(pdf_file.resolve()))
self.pdf_viewer.load_pdf(str(pdf_file.resolve()))
print(f"\nLoaded: {pdf_file.name}\n")
def _flomnicam_check_device_exists(self, device):
try:
device
@@ -176,29 +201,21 @@ class flomniGuiTools:
def flomnigui_show_progress(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("progressbar"):
if self._flomnigui_is_missing("progressbar"):
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self.progressbar = self.gui.flomni.new("RingProgressBar")
# Setting multiple rings with different values
self.progressbar.add_ring().set_update("manual")
self.progressbar.add_ring().set_update("manual")
self.progressbar.add_ring().set_update("scan")
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
main_progress_ring = self.progressbar.rings[0]
subtomo_progress_ring = self.progressbar.rings[1]
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
@@ -206,10 +223,11 @@ class flomniGuiTools:
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
main_progress_ring.set_value(progress)
subtomo_progress_ring.set_value(subtomo_progress)
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.progressbar.set_center_label(text)
if __name__ == "__main__":
@@ -220,6 +238,7 @@ if __name__ == "__main__":
client.start()
client.gui = BECGuiClient()
flomni_gui = flomniGuiTools(client)
flomni_gui = flomniGuiTools()
flomni_gui.set_client(client)
flomni_gui.flomnigui_show_gui()
flomni_gui.flomnigui_show_progress()

View File

@@ -5,16 +5,22 @@ import os
import time
from typing import TYPE_CHECKING
import numpy as np
from bec_lib import bec_logger
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger
# import builtins to avoid linter errors
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING:
from bec_ipython_client.plugins.flomni import Flomni
@@ -22,7 +28,7 @@ if TYPE_CHECKING:
class XrayEyeAlign:
# pixel calibration, multiply to get mm
labview=False
test_wo_movements = True
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None:
@@ -34,209 +40,190 @@ class XrayEyeAlign:
self.flomni.reset_correction()
self.flomni.reset_tomo_alignment_fit()
@property
def gui(self):
return self.flomni.xeyegui
def _reset_init_values(self):
self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0]
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self, keep_shutter_open=False):
def update_frame(self,keep_shutter_open=False):
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
if not self.labview:
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
if not dev.cam_xeye.live_mode_enabled.get():
dev.cam_xeye.live_mode_enabled.put(True)
epics_put("XOMNYI-XEYE-ACQ:0", 1)
if self.labview:
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
self.gui.on_live_view_enabled(True)
fshopen()
if self.labview:
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
dev.omnyfsh.fshopen()
time.sleep(0.5)
# stop live view
if not keep_shutter_open:
epics_put("XOMNYI-XEYE-ACQ:0", 0)
self.gui.on_live_view_enabled(False)
time.sleep(0.1)
fshclose()
print("got new frame")
dev.omnyfsh.fshclose()
print("Received new frame.")
else:
print("Staying in live view, shutter is and remains open!")
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
umv(self.device_manager.devices.fsamroy, val)
if not self.test_wo_movements:
umv(self.device_manager.devices.fsamroy, val)
def get_tomo_angle(self):
return self.device_manager.devices.fsamroy.readback.get()
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
self._xray_fov_xy[0] = max(
getattr(dev.omny_xray_gui, f"width_x_{k}").get(), self._xray_fov_xy[0]
)
self._xray_fov_xy[1] = max(
getattr(dev.omny_xray_gui, f"width_y_{k}").get(), self._xray_fov_xy[1]
)
@property
def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def movement_buttons_enabled(self, enablex: bool, enabley: bool):
self.gui.on_motors_enable(enablex, enabley)
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
print(f"In alginment GUI: {msg}")
self.gui.user_message = msg
def align(self,keep_shutter_open=False):
def align(self, keep_shutter_open=False):
self.flomni.flomnigui_show_xeyealign()
if not keep_shutter_open:
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
print(
"This routine can be called with paramter keep_shutter_open=True to keep the shutter always open"
)
self.send_message("Getting things ready. Please wait...")
#potential unresolved movement requests to zero
epics_put("XOMNYI-XEYE-MVX:0", 0)
epics_put("XOMNYI-XEYE-MVY:0", 0)
self.gui.enable_submit_button(False)
# Initialize xray align device
# clear potential pending movement requests
dev.omny_xray_gui.mvx.set(0)
dev.omny_xray_gui.mvy.set(0)
# reset submit channel
dev.omny_xray_gui.submit.set(0)
self.movement_buttons_enabled(False, False)
# reset shift xy and fov params
self._reset_init_values()
self.flomni.lights_off()
self.flomni.flomnigui_show_xeyealign()
self.flomni.flomnigui_raise()
if not self.test_wo_movements:
self.tomo_rotate(0)
self.tomo_rotate(0)
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
self.flomni.feye_in()
self.flomni.feye_in()
self.flomni.laser_tracker_on()
self.flomni.feedback_enable_with_reset()
# disable movement buttons
self.movement_buttons_enabled = False
self.movement_buttons_enabled(False, False)
sample_name = self.flomni.sample_get_name(0)
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", sample_name)
self.gui.sample_name = sample_name
# this makes sure we are in a defined state
self.flomni.feedback_disable()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
if not self.test_wo_movements:
self.flomni.fosa_out()
self.flomni.fosa_out()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in()
self.flomni.ffzp_in()
self.update_frame(keep_shutter_open)
# enable submit buttons
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.gui.enable_submit_button(True)
dev.omny_xray_gui.step.set(0).wait()
self.send_message("Submit center value of FZP.")
k = 0
while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") / 2 * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = val_x
if dev.omny_xray_gui.submit.get() == 1:
self.alignment_values[k] = (
getattr(dev.omny_xray_gui, f"xval_x_{k}").get() / 2 * self.PIXEL_CALIBRATION
) # in mm
print(f"Clicked position {k}: x {self.alignment_values[k]}")
rtx_position = dev.rtx.readback.get() / 1000
print(f"Current rtx position {rtx_position}")
self.alignment_values[k] -= rtx_position
print(f"Corrected position {k}: x {self.alignment_values[k]}")
# reset submit channel
dev.omny_xray_gui.submit.set(0)
if k == 0: # received center value of FZP
self.send_message("please wait ...")
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled(False, False)
self.gui.enable_submit_button(False)
self.flomni.feedback_disable()
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in)
if not self.test_wo_movements:
fsamx_in = self.flomni._get_user_param_safe("fsamx", "in")
umv(dev.fsamx, fsamx_in)
self.flomni.foptics_out()
self.flomni.foptics_out()
self.flomni.feedback_disable()
umv(dev.fsamx, fsamx_in - 0.25)
time.sleep(0.5)
if self.labview:
self.update_frame(keep_shutter_open)
epics_put("XOMNYI-XEYE-RECBG:0", 1)
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
time.sleep(0.5)
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in)
time.sleep(0.5)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
self.send_message("Adjust sample height and submit center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
self.send_message("Step 1/5: Adjust sample height and submit center")
self.gui.enable_submit_button(True)
self.movement_buttons_enabled(True, True)
elif 1 <= k < 5: # received sample center value at samroy 0 ... 315
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
self.gui.enable_submit_button(False)
self.movement_buttons_enabled(False, False)
umv(dev.rtx, 0)
self.tomo_rotate(k * 45)
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
dev.omny_xray_gui.angle.set(self.get_tomo_angle())
self.update_frame(keep_shutter_open)
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.send_message(f"Step {k+1}/5: Submit sample center")
self.gui.enable_submit_button(True)
self.movement_buttons_enabled(True, False)
self.update_fov(k)
elif k == 5: # received sample center value at samroy 270 and done
self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1) # disable submit button
self.movement_buttons_enabled = False
self.gui.enable_submit_button(False)
self.movement_buttons_enabled(False, False)
self.update_fov(k)
break
k += 1
epics_put("XOMNYI-XEYE-STEP:0", k)
dev.omny_xray_gui.step.set(k)
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
_xrayeyalignmvx = dev.omny_xray_gui.mvx.get()
if _xrayeyalignmvx != 0:
umvr(dev.rtx, _xrayeyalignmvx)
print(f"Current rtx position {dev.rtx.readback.get() / 1000}")
epics_put("XOMNYI-XEYE-MVX:0", 0)
if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3)
dev.omny_xray_gui.mvx.set(0)
self.update_frame(keep_shutter_open)
if k < 2:
# allow movements, store movements to calculate center
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
_xrayeyalignmvy = dev.omny_xray_gui.mvy.get()
if _xrayeyalignmvy != 0:
self.flomni.feedback_disable()
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
if not self.test_wo_movements:
umvr(dev.fsamy, _xrayeyalignmvy / 1000)
time.sleep(2)
epics_put("XOMNYI-XEYE-MVY:0", 0)
dev.omny_xray_gui.mvy.set(0)
self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open)
time.sleep(0.2)
time.sleep(0.1)
self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
@@ -246,22 +233,17 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
# free camera
if self.labview:
epics_put("XOMNYI-XEYE-ACQ:0", 2)
if keep_shutter_open and not self.labview:
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
if keep_shutter_open:
if self.flomni.OMNYTools.yesno("Close the shutter now?", "y"):
dev.omnyfsh.fshclose()
self.gui.on_live_view_enabled(False)
print("setting 'XOMNYI-XEYE-ACQ:0'")
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"
f" = {fovy:.0f} microns"
)
print("Use the matlab routine to FIT the current alignment...")
print("Check the fit in the GUI...")
print("Then LOAD ALIGNMENT PARAMETERS by running flomni.read_alignment_offset()\n")
@@ -269,9 +251,32 @@ class XrayEyeAlign:
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")
if not os.path.exists(file):
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, "w") as alignment_values_file:
alignment_values_file.write("angle\thorizontal\n")
# Initialize an empty list to store fovx values
fovx_list = []
fovx_offsets = np.zeros(5) # holds offsets for k = 1..5
for k in range(1, 6):
fovx_offset = self.alignment_values[0] - self.alignment_values[k]
fovx_offsets[k - 1] = fovx_offset # store in array
fovx_x = (k - 1) * 45
fovx_list.append([fovx_x, fovx_offset * 1000]) # Append the data to the list
print(f"Writing to file new alignment: number {k}, value x {fovx_offset}")
alignment_values_file.write(f"{(k-1)*45}\t{fovx_offset*1000}\n")
alignment_values_file.write(f"{fovx_x}\t{fovx_offset * 1000}\n")
# Now build final numpy array:
data = np.array(
[
[0, 45, 90, 135, 180], # angles
fovx_offsets * 1000, # fovx_offset values
[0, 0, 0, 0, 0],
]
)
self.gui.submit_fit_array(data)
print(f"fit submited with {data}")
# self.flomni.flomnigui_show_xeyealign_fittab()

View File

@@ -1,14 +1,14 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYGuiToolsError(Exception):
@@ -19,7 +19,7 @@ class OMNYGuiTools:
def __init__(self, client):
self.gui = getattr(client, "gui", None)
self.gui_window = self.gui.windows['main'].widget
self.gui_window = self.gui.windows["main"].widget
self.fig200 = None
self.fig201 = None
self.fig202 = None
@@ -81,16 +81,12 @@ class OMNYGuiTools:
pass
text = (
"<pre>"
+ " ,o888888o. ,8. ,8. b. 8 `8.`8888. ,8' \n"
+ " . 8888 `88. ,888. ,888. 888o. 8 `8.`8888. ,8' \n"
+ ",8 8888 `8b .`8888. .`8888. Y88888o. 8 `8.`8888. ,8' \n"
+ "88 8888 `8b ,8.`8888. ,8.`8888. .`Y888888o. 8 `8.`8888.,8' \n"
+ "88 8888 88 ,8'8.`8888,8^8.`8888. 8o. `Y888888o. 8 `8.`88888' \n"
+ "88 8888 88 ,8' `8.`8888' `8.`8888. 8`Y8o. `Y88888o8 `8. 8888 \n"
+ "88 8888 ,8P ,8' `8.`88' `8.`8888. 8 `Y8o. `Y8888 `8 8888 \n"
+ "`8 8888 ,8P ,8' `8.`' `8.`8888. 8 `Y8o. `Y8 8 8888 \n"
+ " ` 8888 ,88' ,8' `8 `8.`8888. 8 `Y8o.` 8 8888 \n"
+ " `8888888P' ,8' ` `8.`8888. 8 `Yo 8 8888 \n"
+ "██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗\n"
+ "██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝\n"
+ "██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝ \n"
+ "██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝ \n"
+ "██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║ \n"
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ \n"
+ "</pre>"
)
self.idle_text_box.set_html_text(text)
@@ -137,7 +133,9 @@ class OMNYGuiTools:
if self.progressbar is None:
self.omnygui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui_window.add_dock(name="progress").add_widget("RingProgressBar")
self.progressbar = self.gui_window.add_dock(name="progress").add_widget(
"RingProgressBar"
)
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value

View File

@@ -27,9 +27,10 @@ logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYInitError(Exception):
pass

View File

@@ -1,25 +1,36 @@
import time
import numpy as np
import sys
import termios
import tty
import builtins
import datetime
import fcntl
import os
import builtins
import socket
import subprocess
import sys
import termios
import threading
import time
import tty
from pathlib import Path
import epics
import numpy as np
from bec_lib import bec_logger
from rich import box
from rich.console import Console
from rich.table import Table
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
def umvr(*args):
return scans.umv(*args, relative=True)
class OMNYToolsError(Exception):
pass
@@ -108,24 +119,20 @@ class OMNYTools:
next1, next2 = sys.stdin.read(2)
if next1 == "[":
if next2 == "A":
# print("up")
if dev2 != "none":
umvr(dev2, step2)
if special_command != "none":
special_command()
elif next2 == "B":
# print(" down")
if dev2 != "none":
umvr(dev2, -step2)
if special_command != "none":
special_command()
elif next2 == "C":
# print("right")
umvr(dev1, step1)
if special_command != "none":
special_command()
elif next2 == "D":
# print("left")
umvr(dev1, -step1)
if special_command != "none":
special_command()
@@ -141,13 +148,135 @@ class OMNYTools:
step2 = step2 / 2
print(f"\rHalf step size. New step size: {step1}, {step2}\r")
except IOError:
# No input available, keep looping
pass
# Sleep for a short period to avoid high CPU usage
time.sleep(0.02)
finally:
# Restore the terminal to its original state
termios.tcsetattr(fd, termios.TCSADRAIN, old_term)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
import socket
class PtychoReconstructor:
"""Writes ptychography reconstruction queue files after each scan projection.
An external reconstruction engine monitors the queue folder and picks
up .dat files as they are written.
Usage:
reconstructor = PtychoReconstructor(folder_name="reconstruction_queue")
reconstructor.write(
scan_list=[1023, 1024],
next_scan_number=1025,
base_path="~/data/raw",
)
"""
def __init__(self, folder_name: str = "reconstruction_queue"):
self.folder_name = folder_name
def _accounts_match(self) -> bool:
"""Check if bec.active_account matches the current system user (p vs e prefix)."""
try:
bec = builtins.__dict__.get("bec")
active = bec.active_account # e.g. "p23092"
system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092"
print(f"Active server account {active}, BEC client account {system_user}.")
return active[1:] == system_user[1:]
except Exception:
logger.warning("Failed to compare active account to system user.")
return False
def write(self, scan_list: list, next_scan_number: int, base_path: str = "~/data/raw/analysis/"):
"""Write a reconstruction queue file for the given scan list.
Args:
scan_list (list): Scan numbers belonging to this projection
(may contain multiple entries when stitching).
next_scan_number (int): The current next scan number, used to
name the queue file.
base_path (str): Root path under which the queue folder lives.
"""
if not self._accounts_match():
logger.warning("Active BEC account does not match system user — skipping queue file write.")
return
base_path = os.path.expanduser(base_path)
queue_path = Path(os.path.join(base_path, self.folder_name))
queue_path.mkdir(parents=True, exist_ok=True)
last_scan_number = next_scan_number - 1
queue_file = os.path.abspath(
os.path.join(queue_path, f"scan_{last_scan_number:05d}.dat")
)
with open(queue_file, "w") as f:
scans = " ".join(str(s) for s in scan_list)
f.write(f"p.scan_number {scans}\n")
f.write("p.check_nextscan_started 1\n")
class TomoIDManager:
"""Registers a tomography measurement in the OMNY sample database
and returns its assigned tomo ID.
Usage:
id_manager = TomoIDManager()
tomo_id = id_manager.register(
sample_name="my_sample",
date="2024-03-08",
eaccount="e12345",
scan_number=1001,
setup="lamni",
additional_info="test info",
user="BEC",
)
"""
OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
OMNY_USER = "omny"
OMNY_PASSWORD = "samples"
TMP_FILE = "/tmp/currsamplesnr.txt"
def register(
self,
sample_name: str,
date: str,
eaccount: str,
scan_number: int,
setup: str,
additional_info: str,
user: str,
) -> int:
"""Register a new measurement and return the assigned tomo ID.
Args:
sample_name (str): Name of the sample.
date (str): Date string (e.g. "2024-03-08").
eaccount (str): E-account identifier.
scan_number (int): First scan number of the measurement.
setup (str): Setup name (e.g. "lamni").
additional_info (str): Any additional sample information.
user (str): User name.
Returns:
int: The tomo ID assigned by the OMNY database.
"""
url = (
f"{self.OMNY_URL}"
f"?sample={sample_name}"
f"&date={date}"
f"&eaccount={eaccount}"
f"&scannr={scan_number}"
f"&setup={setup}"
f"&additional={additional_info}"
f"&user={user}"
)
subprocess.run(
f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
f" -q -O {self.TMP_FILE} '{url}'",
shell=True,
)
with open(self.TMP_FILE) as f:
return int(f.read())

View File

@@ -16,8 +16,10 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fsh
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYTransferError(Exception):

View File

@@ -13,8 +13,10 @@ logger = bec_logger.logger
# import builtins to avoid linter errors
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING:
from bec_ipython_client.plugins.omny import OMNY

View File

@@ -30,29 +30,74 @@ logger = bec_logger.logger
logger.info("Using the cSAXS startup script.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
elif _args.session.lower() == "csaxs":
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
logger.success("cSAXS session loaded.")
from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools
debug = DebugTools()
logger.success("Debug tools loaded. Use 'debug' to access them.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS.cSAXS import cSAXS
csaxs = cSAXS(bec)
logger.success("cSAXS session loaded.")
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C L a m N I
""")
elif _args.session.lower() == "omny":
from csaxs_bec.bec_ipython_client.plugins.flomni import OMNY
_session_name = "OMNY"
omny = OMNY(bec)
logger.success("OMNY session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗
██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝
██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝
██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝
██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝
B E C O M N Y
""")
elif _args.session.lower() == "flomni":
from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni
_session_name = "flomni"
flomni = Flomni(bec)
logger.success("flomni session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C f l O M N I
""")
# SETUP BEAMLINE INFO
from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
@@ -66,3 +111,34 @@ bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS
bec._ip.prompts.session_name = _session_name
bec._ip.prompts.status = 1
# ACCOUNT MISMATCH CHECK
import os
def _check_account_mismatch():
try:
active = bec.active_account # e.g. "p23092"
system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092"
if active[1:] != system_user[1:]:
print(f"""
\033[91m\033[1m
██╗ ██╗ █████╗ ██████╗ ███╗ ██╗██╗███╗ ██╗ ██████╗
██║ ██║██╔══██╗██╔══██╗████╗ ██║██║████╗ ██║██╔════╝
██║ █╗ ██║███████║██████╔╝██╔██╗ ██║██║██╔██╗ ██║██║ ███╗
██║███╗██║██╔══██║██╔══██╗██║╚██╗██║██║██║╚██╗██║██║ ██║
╚███╔███╔╝██║ ██║██║ ██║██║ ╚████║██║██║ ╚████║╚██████╔╝
╚══╝╚══╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═══╝ ╚═════╝
ACCOUNT MISMATCH DETECTED!
BEC active account : {active}
System user : {system_user}
Data read and written by the BEC client does not match the data account!
Please verify you are logged in with the correct account.
\033[0m""")
except Exception:
logger.warning("Failed to verify account match.")
if _args.session.lower() == "lamni" or _args.session.lower() == "flomni" or _args.session.lower() == "omny":
_check_account_mismatch()

View File

@@ -13,69 +13,10 @@ logger = bec_logger.logger
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye",
}
class OmnyAlignment(RPCBase):
@property
@rpc_call
def enable_live_view(self):
"""
None
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
None
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""
class XRayEye(RPCBase):
@rpc_call
def active_roi(self) -> "BaseROI | None":
@@ -83,20 +24,6 @@ class XRayEye(RPCBase):
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):
@@ -111,6 +38,33 @@ class XRayEye(RPCBase):
None
"""
@rpc_timeout(20)
@rpc_call
def on_live_view_enabled(self, enabled: "bool"):
"""
None
"""
@rpc_timeout(20)
@rpc_call
def on_motors_enable(self, x_enable: "bool", y_enable: "bool"):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
@rpc_timeout(20)
@rpc_call
def enable_submit_button(self, enable: "bool"):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
@property
@rpc_call
def sample_name(self):
@@ -139,6 +93,20 @@ class XRayEye(RPCBase):
None
"""
@rpc_timeout(20)
@rpc_call
def switch_tab(self, tab: "str"):
"""
None
"""
@rpc_timeout(20)
@rpc_call
def submit_fit_array(self, fit_array):
"""
None
"""
class XRayEye2DControl(RPCBase):
@rpc_call
@@ -146,3 +114,15 @@ class XRayEye2DControl(RPCBase):
"""
Cleanup the BECConnector
"""
@rpc_call
def attach(self):
"""
None
"""
@rpc_call
def detach(self):
"""
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
"""

View File

@@ -1,140 +0,0 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
return
image.disconnect_monitor("cam_xeye")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['omny_alignment.py']}

View File

@@ -1,125 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -4,7 +4,9 @@ from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.utils.rpc_decorator import rpc_timeout
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
@@ -21,7 +23,10 @@ from qtpy.QtWidgets import (
QToolButton,
QVBoxLayout,
QWidget,
QTextEdit,
QTabWidget,
)
import time
logger = bec_logger.logger
CAMERA = ("cam_xeye", "image")
@@ -41,38 +46,38 @@ class XRayEye2DControl(BECWidget, QWidget):
""")
# Up
self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.move_up_button.setIcon(material_icon("keyboard_double_arrow_up"))
self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button
self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.move_up_tweak_button.setIcon(material_icon("keyboard_arrow_up"))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left
self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.move_left_button.setIcon(material_icon("keyboard_double_arrow_left"))
self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button
self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.move_left_tweak_button.setIcon(material_icon("keyboard_arrow_left"))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right
self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.move_right_button.setIcon(material_icon("keyboard_double_arrow_right"))
self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button
self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.move_right_tweak_button.setIcon(material_icon("keyboard_arrow_right"))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down
self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.move_down_button.setIcon(material_icon("keyboard_double_arrow_down"))
self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button
self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.move_down_tweak_button.setIcon(material_icon("keyboard_arrow_down"))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections
@@ -124,8 +129,20 @@ class XRayEye2DControl(BECWidget, QWidget):
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
USER_ACCESS = [
"active_roi",
"user_message",
"user_message.setter",
"on_live_view_enabled",
"on_motors_enable",
"enable_submit_button",
"sample_name",
"sample_name.setter",
"enable_move_buttons",
"enable_move_buttons.setter",
"switch_tab",
"submit_fit_array",
]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
@@ -136,28 +153,44 @@ class XRayEye(BECWidget, QWidget):
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.bec_dispatcher.connect_slot(
self.getting_shutter_status, MessageEndpoints.device_readback("omnyfsh")
)
self.bec_dispatcher.connect_slot(
self.getting_camera_status, MessageEndpoints.device_read_configuration(CAMERA[0])
)
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self):
self.core_layout = QHBoxLayout(self)
self.root_layout = QVBoxLayout(self)
self.tab_widget = QTabWidget(parent=self)
self.root_layout.addWidget(self.tab_widget)
self.image = Image(parent=self)
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.alignment_tab = QWidget(parent=self)
self.core_layout = QHBoxLayout(self.alignment_tab)
self.image = Image(parent=self.alignment_tab)
self.image.color_map = "CET-L2"
self.image.enable_toolbar = (
False # Disable default toolbar to not allow to user set anything
)
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
self.image.enable_full_colorbar = True
self.image.invert_y = True # Invert y axis to match image coordinates
# Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self)
self.control_panel = QWidget(parent=self.alignment_tab)
self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
compact_orientation="horizontal")
self.roi_manager = ROIPropertyTree(
parent=self, image_widget=self.image, compact=True, compact_orientation="horizontal"
)
header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8)
@@ -166,16 +199,36 @@ class XRayEye(BECWidget, QWidget):
self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_label, 0, Qt.AlignmentFlag.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(header_row)
switch_row = QHBoxLayout()
switch_row.setContentsMargins(0, 0, 0, 0)
switch_row.setSpacing(8)
switch_row.addStretch()
self.camera_running_label = QLabel("Camera running", parent=self)
self.camera_running_toggle = ToggleSwitch(parent=self)
# self.camera_running_toggle.checked = False
self.camera_running_toggle.enabled.connect(self.camera_running_enabled)
self.shutter_label = QLabel("Shutter open", parent=self)
self.shutter_toggle = ToggleSwitch(parent=self)
# self.shutter_toggle.checked = False
self.shutter_toggle.enabled.connect(self.opening_shutter)
switch_row.addWidget(self.shutter_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.shutter_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_label, 0, Qt.AlignmentFlag.AlignVCenter)
switch_row.addWidget(self.camera_running_toggle, 0, Qt.AlignmentFlag.AlignVCenter)
self.control_panel_layout.addLayout(switch_row)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
self.control_panel_layout.addWidget(
self.motor_control_2d, 0, Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignCenter
)
# separator
self.control_panel_layout.addWidget(self._create_separator())
@@ -190,9 +243,8 @@ class XRayEye(BECWidget, QWidget):
# Submit button
self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
step_size_form.addWidget(QLabel("Step Size", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel
@@ -207,7 +259,8 @@ class XRayEye(BECWidget, QWidget):
self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QLineEdit(parent=self)
self.message_line_edit = QTextEdit(parent=self)
self.message_line_edit.setFixedHeight(60)
self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1)
@@ -217,12 +270,57 @@ class XRayEye(BECWidget, QWidget):
self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
self.control_panel.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Expanding)
# Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel)
self.tab_widget.addTab(self.alignment_tab, "Alignment")
self.fit_tab = QWidget(parent=self)
self.fit_layout = QVBoxLayout(self.fit_tab)
self.waveform_x = Waveform(parent=self.fit_tab)
self.waveform_y = Waveform(parent=self.fit_tab)
self.waveform_x.plot(
x=[0],
y=[1],
label="fit-x",
dap=["SineModel", "LinearModel"],
dap_parameters=[
{"frequency": {"value": 0.0174533, "vary": False, "min": 0.01, "max": 0.02}},
{"slope": {"value": 0, "vary": False, "min": 0.0, "max": 0.02}},
],
dap_oversample=5,
)
self.waveform_y.plot(
x=[0],
y=[2],
label="fit-y",
dap=["SineModel", "LinearModel"],
dap_parameters=[
{"frequency": {"value": 0.0174533, "vary": False, "min": 0.01, "max": 0.02}},
{"slope": {"value": 0, "vary": False, "min": 0.0, "max": 0.02}},
],
dap_oversample=5,
)
self.fit_x = self.waveform_x.curves[0]
self.fit_y = self.waveform_y.curves[0]
self.waveform_x.dap_params_update.connect(self.on_dap_params)
self.waveform_y.dap_params_update.connect(self.on_dap_params)
for wave in (self.waveform_x, self.waveform_y):
wave.x_label = "Angle (deg)"
wave.x_grid = True
wave.y_grid = True
wave.enable_toolbar = True
self.fit_layout.addWidget(self.waveform_x)
self.fit_layout.addWidget(self.waveform_y)
self.tab_widget.addTab(self.fit_tab, "Fit")
def _make_connections(self):
# Fetch initial state
self.on_live_view_enabled(True)
@@ -230,31 +328,36 @@ class XRayEye(BECWidget, QWidget):
# Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
self.step_size.valueChanged.connect(
lambda x: self.motor_control_2d.setProperty("step_size", x)
)
self.submit_button.clicked.connect(self.submit)
def _create_separator(self):
sep = QFrame(parent=self)
sep.setFrameShape(QFrame.HLine)
sep.setFrameShadow(QFrame.Sunken)
sep.setFrameShape(QFrame.Shape.HLine)
sep.setFrameShadow(QFrame.Shadow.Sunken)
sep.setLineWidth(1)
return sep
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
self.dev.omnyfsh.read()
################################################################################
# Device Connection logic
################################################################################
def connect_motors(self):
""" Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
"""Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ["osamroy", "lsamrot", "fsamroy"]
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
logger.info(f"Succesfully connected to {motor}")
self.bec_dispatcher.connect_slot(
self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor)
)
logger.info(f"Successfully connected to {motor}")
################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed
@@ -264,6 +367,7 @@ class XRayEye(BECWidget, QWidget):
return self.message_line_edit.text()
@user_message.setter
@rpc_timeout(20)
def user_message(self, message: str):
self.message_line_edit.setText(message)
@@ -272,6 +376,7 @@ class XRayEye(BECWidget, QWidget):
return self.sample_name_line_edit.text()
@sample_name.setter
@rpc_timeout(20)
def sample_name(self, message: str):
self.sample_name_line_edit.setText(message)
@@ -291,6 +396,14 @@ class XRayEye(BECWidget, QWidget):
# Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeSlot(str)
@rpc_timeout(20)
def switch_tab(self, tab: str):
if tab == "fit":
self.tab_widget.setCurrentIndex(1)
else:
self.tab_widget.setCurrentIndex(0)
@SafeSlot()
def get_roi_coordinates(self) -> dict | None:
"""Get the coordinates of the currently active ROI."""
@@ -302,20 +415,56 @@ class XRayEye(BECWidget, QWidget):
return roi.get_coordinates()
@SafeSlot(bool)
@rpc_timeout(20)
def on_live_view_enabled(self, enabled: bool):
logger.info(f"Live view is enabled: {enabled}")
self.live_preview_toggle.blockSignals(True)
if enabled:
self.live_preview_toggle.checked = enabled
self.image.image(CAMERA)
self.image.image(device=CAMERA[0], signal=CAMERA[1])
self.live_preview_toggle.blockSignals(False)
return
self.image.disconnect_monitor(CAMERA)
self.image.disconnect_monitor(CAMERA[0], CAMERA[1])
self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool)
def camera_running_enabled(self, enabled: bool):
logger.info(f"Camera running: {enabled}")
self.camera_running_toggle.blockSignals(True)
self.dev.get(CAMERA[0]).live_mode_enabled.put(enabled)
self.camera_running_toggle.checked = enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_camera_status(self, data, meta):
print(f"msg:{data}")
live_mode_enabled = data.get("signals").get(f"{CAMERA[0]}_live_mode_enabled").get("value")
self.camera_running_toggle.blockSignals(True)
self.camera_running_toggle.checked = live_mode_enabled
self.camera_running_toggle.blockSignals(False)
@SafeSlot(bool)
def opening_shutter(self, enabled: bool):
logger.info(f"Shutter changed from GUI to: {enabled}")
self.shutter_toggle.blockSignals(True)
if enabled:
self.dev.omnyfsh.fshopen()
else:
self.dev.omnyfsh.fshclose()
# self.shutter_toggle.checked = enabled
self.shutter_toggle.blockSignals(False)
@SafeSlot(dict, dict)
def getting_shutter_status(self, data, meta):
shutter_open = bool(data.get("signals").get("omnyfsh_shutter").get("value"))
self.shutter_toggle.blockSignals(True)
self.shutter_toggle.checked = shutter_open
self.shutter_toggle.blockSignals(False)
@SafeSlot(bool, bool)
@rpc_timeout(20)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
"""
Enable/Disable motor controls
@@ -327,98 +476,110 @@ class XRayEye(BECWidget, QWidget):
self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(int)
def enable_submit_button(self, enable: int):
@SafeSlot(bool)
@rpc_timeout(20)
def enable_submit_button(self, enable: bool):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
if enable == -1:
self.submit_button.setEnabled(False)
else:
if enable:
self.submit_button.setEnabled(True)
else:
self.submit_button.setEnabled(False)
@SafeSlot(dict, dict)
def on_dap_params(self, data, meta):
print("#######################################")
print("getting dap parameters")
print(f"data: {data}")
print(f"meta: {meta}")
self.waveform_x.auto_range(True)
self.waveform_y.auto_range(True)
# self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
curve_id = meta.get("curve_id")
if curve_id == "fit-x-SineModel+LinearModel":
self.dev.omny_xray_gui.fit_params_x.set(data).wait()
print(f"setting x data to {data}")
else:
self.dev.omny_xray_gui.fit_params_y.set(data).wait()
print(f"setting y data to {data}")
# self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
# TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@SafeSlot(dict, dict)
def device_updates(self, data: dict, meta: dict):
"""
Slot to handle device updates from omny_xray_gui device.
Args:
data(dict): data from device
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot()
@rpc_timeout(20)
def submit_fit_array(self, fit_array):
self.tab_widget.setCurrentIndex(1)
# self.fix_x.title = " got fit array"
print(f"got fit array {fit_array}")
self.waveform_x.curves[0].set_data(x=fit_array[0], y=fit_array[1])
self.waveform_y.curves[0].set_data(x=fit_array[0], y=fit_array[2])
# self.fit_x.set_data(x=fit_array[0],y=fit_array[1])
# self.fit_y.set_data(x=fit_array[0],y=fit_array[2])
@SafeSlot()
def submit(self):
"""Execute submit action by submit button."""
print("submit pushed")
self.submit_button.blockSignals(True)
if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI")
return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates['center_y']
roi_center_x = roi_coordinates["center_x"]
roi_center_y = roi_coordinates["center_y"]
# Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates['width']
roi_height = roi_coordinates['height']
roi_width = roi_coordinates["width"]
roi_height = roi_coordinates["height"]
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates['radius']
roi_width = roi_coordinates["diameter"]
roi_height = roi_coordinates["radius"]
else:
logger.warning("Unsupported ROI type for submit action.")
return
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
print(
f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}"
) # TODO remove when will be not needed for debugging
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get("value"))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
xval_x = getattr(self.dev.omny_xray_gui, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
print("submit done")
self.submit_button.blockSignals(False)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
getattr(self.dev,CAMERA[0]).live_mode = False
self.bec_dispatcher.disconnect_slot(
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
)
getattr(self.dev, CAMERA[0]).stop_live_mode()
super().cleanup()
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv)
apply_theme("light")
dispatcher = BECDispatcher(gui_id="xray")
win = XRayEye()
win.resize(1000, 800)

View File

@@ -9,27 +9,27 @@ eiger_1_5:
readoutPriority: async
softwareTrigger: False
eiger_9:
description: Eiger 9M detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False
# eiger_9:
# description: Eiger 9M detector
# deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
# deviceConfig:
# detector_distance: 100
# beam_center: [0, 0]
# onFailure: raise
# enabled: true
# readoutPriority: async
# softwareTrigger: False
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
deviceConfig:
camera_id: 201
bits_per_pixel: 24
m_n_colormode: 1
live_mode: True
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: True
# ids_cam:
# description: IDS camera for live image acquisition
# deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
# deviceConfig:
# camera_id: 201
# bits_per_pixel: 24
# m_n_colormode: 1
# live_mode: True
# onFailure: raise
# enabled: true
# readoutPriority: async
# softwareTrigger: True

View File

@@ -0,0 +1,25 @@
############################################################
##################### EPS ##################################
############################################################
x12saEPS:
description: X12SA EPS info and control
deviceClass: csaxs_bec.devices.epics.eps.EPS
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
##################### GalilRIO #############################
############################################################
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesxbox.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -1,11 +1,11 @@
# This is the main configuration file that is
# commented or uncommented according to the type of experiment
optics:
- !include ./bl_optics_hutch.yaml
# optics:
# - !include ./bl_optics_hutch.yaml
frontend:
- !include ./bl_frontend.yaml
# frontend:
# - !include ./bl_frontend.yaml
endstation:
- !include ./bl_endstation.yaml
@@ -16,8 +16,8 @@ detectors:
#sastt:
# - !include ./sastt.yaml
#flomni:
# - !include ./ptycho_flomni.yaml
flomni:
- !include ./ptycho_flomni.yaml
#omny:
# - !include ./ptycho_omny.yaml

View File

@@ -435,9 +435,9 @@ cam_xeye:
# deviceConfig:
# camera_id: 203
# bits_per_pixel: 24
# num_rotation_90: 3
# num_rotation_90: 2
# transpose: false
# force_monochrome: true
# force_monochrome: false
# m_n_colormode: 1
# enabled: true
# onFailure: buffer
@@ -471,8 +471,8 @@ omnyfsh:
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui Epics signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayAlignGUI
deviceConfig: {}
enabled: true
onFailure: buffer
@@ -486,4 +486,25 @@ calculated_signal:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline
readoutPriority: baseline
############################################################
#################### OMNY Pandabox #########################
############################################################
omny_panda:
readoutPriority: async
deviceClass: csaxs_bec.devices.panda_box.panda_box_omny.PandaBoxOMNY
deviceConfig:
host: omny-panda.psi.ch
signal_alias:
FMC_IN.VAL1.Min: cap_voltage_fzp_y_min
FMC_IN.VAL1.Max: cap_voltage_fzp_y_max
FMC_IN.VAL1.Mean: cap_voltage_fzp_y_mean
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
deviceTags:
- detector
enabled: true
readOnly: false
softwareTrigger: false

View File

@@ -271,4 +271,20 @@ rty:
enabled: true
readOnly: False
############################################################
######################### Cameras ##########################
############################################################
cam_xeye:
description: Camera LamNI Xray eye ID15
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 15
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async

View File

@@ -0,0 +1,161 @@
"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK."""
import threading
import traceback
from enum import IntEnum
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt, Kind, Signal
from ophyd.areadetector import ADComponent as ADCpt
from ophyd.areadetector import DetectorBase
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
logger = bec_logger.logger
class ACQUIRE_MODES(IntEnum):
"""Acquiring enums for Allied Vision Camera"""
ACQUIRING = 1
DONE = 0
class AlliedVisionCamera(PSIDeviceBase, DetectorBase):
"""
Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK.
The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'.
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
cam = ADCpt(VimbaDetectorCam, "cam1:")
image = ADCpt(ImagePlugin, "image1:")
preview = Cpt(
PreviewSignal,
name="preview",
ndim=2,
num_rotation_90=0,
transpose=False,
doc="Preview signal of the AlliedVision camera.",
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(
self,
*,
name: str,
prefix: str,
poll_rate: int = 5,
num_rotation_90: int = 0,
transpose: bool = False,
scan_info=None,
device_manager=None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._poll_thread = threading.Thread(
target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread"
)
self._poll_thread_kill_event = threading.Event()
self._poll_start_event = threading.Event()
if poll_rate <= 0:
logger.warning(
f"Poll rate must be positive for Camera {self.name} and non-zero, setting to 1 Hz."
)
poll_rate = 1
self.stop_live_mode()
elif poll_rate > 10:
logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.")
poll_rate = 10
self._poll_rate = poll_rate
self._unique_array_id = 0
self._pv_timeout = 2.0
self.image: ImagePlugin
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
self._live_mode_lock = threading.RLock()
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
self.cam.acquire.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
"""Start live mode."""
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
"""Stop live mode."""
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if not self._poll_start_event.is_set():
self._poll_start_event.set()
self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition
else:
logger.info(f"Live mode already started for {self.name}.")
return
if self._poll_start_event.is_set():
self._poll_start_event.clear()
self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition
else:
logger.info(f"Live mode already stopped for {self.name}.")
def on_connected(self):
"""Reset the unique array ID on connection."""
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout)
self._poll_thread.start()
def _poll_array_data(self):
"""Poll the array data for preview updates."""
while self._poll_start_event.wait():
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
try:
# First check if there is a new image
if self.image.unique_id.get() != self._unique_array_id:
self._unique_array_id = self.image.unique_id.get()
else:
continue # No new image, skip update
# Get new image data
value = self.image.array_data.get()
if value is None:
logger.info(f"No image data available for preview of {self.name}")
continue
array_size = self.image.array_size.get()
if array_size[0] == 0: # 2D image, not color image
array_size = array_size[1:]
# Geometry correction for the image
data = np.reshape(value, array_size)
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Error while polling array data for preview of {self.name}: {content}"
)
def on_destroy(self):
"""Stop the polling thread on destruction."""
self._poll_start_event.set()
self._poll_thread_kill_event.set()
if self._poll_thread.is_alive():
self._poll_thread.join(timeout=2)

View File

@@ -309,7 +309,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay)
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
self.set_delay_pairs(channel="gh", delay=self._shutter_to_open_delay, width=(shutter_width-self._shutter_to_open_delay))
self.set_delay_pairs(
channel="gh",
delay=self._shutter_to_open_delay,
width=(shutter_width - self._shutter_to_open_delay),
)
# Trigger extra pulse for MCS OR gate
# f = e + 1us
@@ -520,7 +524,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
- Return the status object to BEC which will automatically resolve once the status register has
the END_OF_BURST bit set. The callback of the status object will also stop the polling loop.
"""
overall_start = time.time()
self._stop_polling()
# NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source
@@ -559,7 +562,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Send trigger
self.trigger_shot.put(1, use_complete=True)
self.cancel_on_stop(status)
logger.info(f"Configured ddg in {time.time()-overall_start}")
return status
def on_stop(self) -> None:

View File

@@ -261,7 +261,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
"""
with self._rlock:
logger.info(f"Received update on mcs card {self.name}")
if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels
self._mca_counter_index += 1
@@ -293,9 +292,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
)
# Once we have received all channels, push data to BEC and reset for next accumulation
logger.info(
f"Received update for {attr_name}, index {self._mca_counter_index}/{self.NUM_MCA_CHANNELS}"
)
if len(self._current_data) == self.NUM_MCA_CHANNELS:
logger.debug(
f"Current data index {self._current_data_index} complete, pushing to BEC."
@@ -398,11 +394,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# NOTE Make sure that the signal that omits mca callbacks is cleared
self._omit_mca_callbacks.clear()
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
# For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly":
self.erase_start.put(1)
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_prescan(self) -> None | StatusBase:
"""
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure
@@ -446,7 +443,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
while not self._scan_done_thread_kill_event.is_set():
while self._start_monitor_async_data_emission.wait():
try:
logger.debug(f"Monitoring async data emission for {self.name}...")
if (
hasattr(self.scan_info.msg, "num_points")
and self.scan_info.msg.num_points is not None
@@ -456,7 +452,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
for callback in self._scan_done_callbacks:
callback(exception=None)
else:
logger.info(f"Current data index is {self._current_data_index}")
if self._current_data_index >= 1:
for callback in self._scan_done_callbacks:
callback(exception=None)

View File

@@ -156,7 +156,6 @@ class Camera:
camera_id (int): The ID of the camera device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
def __init__(

View File

@@ -3,21 +3,19 @@
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
from typing import TYPE_CHECKING, Literal
import numpy as np
from ophyd import Component as Cpt, Signal, Kind
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
@@ -45,8 +43,15 @@ class IDSCamera(PSIDeviceBase):
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
USER_ACCESS = ["start_live_mode", "stop_live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__(
self,
@@ -83,15 +88,22 @@ class IDSCamera(PSIDeviceBase):
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
self.image.num_rotation_90 = num_rotation_90
self.image.transpose = transpose
self._force_monochrome = force_monochrome
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
self.live_mode_enabled.put(bool(live_mode))
############## Live Mode Methods ##############
def start_live_mode(self) -> None:
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
@@ -114,22 +126,15 @@ class IDSCamera(PSIDeviceBase):
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def _on_live_mode_enabled_changed(self, *args, value, **kwargs):
"""Callback for when live mode is changed."""
enabled = bool(value)
if enabled and self.cam._connected is False: # pylint: disable=protected-access
self.cam.on_connect()
if enabled:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
@@ -196,7 +201,7 @@ class IDSCamera(PSIDeviceBase):
"""Connect to the camera."""
self.cam.force_monochrome = self._force_monochrome
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.live_mode_enabled.put(bool(self._inputs.get("live_mode", False)))
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
@@ -206,7 +211,7 @@ class IDSCamera(PSIDeviceBase):
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
if not bool(self.live_mode_enabled.get()):
return
image = self.image.get()
if image is not None:

View File

@@ -132,7 +132,6 @@ class Eiger(PSIDeviceBase):
if data is None:
logger.error(f"Received image message on device {self.name} without data.")
return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
# pylint: disable=missing-function-docstring

View File

@@ -15,7 +15,6 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilAxesReferenced,
GalilController,
GalilMotorIsMoving,
GalilMotorResolution,
GalilSetpointSignal,
GalilSignalRO,
retry_once,
@@ -24,6 +23,19 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
logger = bec_logger.logger
class GalilMotorResolution(GalilSignalRO):
@retry_once
@threadlocked
def _socket_get(self):
if self.parent.axis_Id_numeric < 6:
return float(
self.controller.socket_put_and_receive(f"MG encpermm[{self.parent.axis_Id_numeric}]")
)
else:
return float(
self.controller.socket_put_and_receive(f"MG stppermm[{self.parent.axis_Id_numeric}]")
)
class LamniGalilController(GalilController):
@@ -154,11 +166,18 @@ class LamniGalilReadbackSignal(GalilSignalRO):
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm
if self.parent.axis_Id_numeric < 6:
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
current_pos *= self.parent.sign
encoder_resolution = self.parent.motor_resolution.get()
logger.info(f"Read galil encoder position of axis {self.parent.axis_Id_numeric} to be TP {current_pos} with resolution {encoder_resolution}")
return current_pos / encoder_resolution
else:
current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()

View File

@@ -2,7 +2,7 @@ import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt
from ophyd import Device, Component as Cpt, Kind, Signal
from ophyd_devices import PreviewSignal
import traceback
@@ -13,6 +13,13 @@ logger = bec_logger.logger
class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
live_mode_enabled = Cpt(
Signal,
name="live_mode_enabled",
value=False,
doc="Enable or disable live mode.",
kind=Kind.config,
)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs)
@@ -21,20 +28,54 @@ class WebcamViewer(Device):
self._update_thread = None
self._buffer = b""
self._shutdown_event = threading.Event()
self._live_mode_lock = threading.RLock()
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False)
def start_live_mode(self) -> None:
if self._connection is not None:
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
self.live_mode_enabled.put(True)
def stop_live_mode(self) -> None:
self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None:
self._apply_live_mode(bool(value))
def _apply_live_mode(self, enabled: bool) -> None:
with self._live_mode_lock:
if enabled:
if self._update_thread is not None and self._update_thread.is_alive():
return
self._shutdown_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
return
if self._update_thread is None:
return
self._shutdown_event.set()
if self._connection is not None:
try:
self._connection.close()
except Exception: # pylint: disable=broad-except
pass
self._connection = None
self._update_thread.join(timeout=2)
if self._update_thread.is_alive():
logger.warning("Webcam live mode thread did not stop within timeout.")
return
self._update_thread = None
self._buffer = b""
self._shutdown_event.clear()
def _update_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
self._connection = requests.get(self.url, stream=True)
self._connection = requests.get(self.url, stream=True, timeout=5)
for chunk in self._connection.iter_content(chunk_size=1024):
if self._shutdown_event.is_set():
break
self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end
@@ -50,16 +91,3 @@ class WebcamViewer(Device):
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -1,74 +1,44 @@
from ophyd import Component as Cpt
import numpy as np
from ophyd import Component as Cpt, Signal, EpicsSignal
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayAlignGUI(Device):
class OMNYXRayEpicsGUI(Device):
update_frame_acqdone = Cpt(Signal, value=0)
update_frame_acq = Cpt(Signal, value=0)
enable_mv_x = Cpt(Signal, value=0)
enable_mv_y = Cpt(Signal, value=0)
send_message = Cpt(Signal, value=0)
sample_name = Cpt(Signal, value=0)
angle = Cpt(Signal, value=0)
pixel_size = Cpt(Signal, value=0)
submit = Cpt(EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0", auto_monitor=True)
step = Cpt(Signal, value=0)
recbg = Cpt(Signal, value=0)
mvx = Cpt(Signal, value=0)
mvy = Cpt(Signal, value=0)
save_frame = Cpt(
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
)
update_frame_acqdone = Cpt(
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
)
step = Cpt(
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
)
xval_x_dynamic = {
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)
fit_array = Cpt(Signal, value=np.zeros((3, 10)))
fit_params_x = Cpt(Signal, value=np.zeros((2, 3)))
fit_params_y = Cpt(Signal, value=np.zeros((2, 3)))
# Generate width_y_0 to width_y_10
for i in range(11):
locals()[f"width_y_{i}"] = Cpt(Signal, value=0)
# Generate width_x_0 to width_x_10
for i in range(11):
locals()[f"width_x_{i}"] = Cpt(Signal, value=0)
# Generate xval_x_0 to xval_x_10
for i in range(11):
locals()[f"xval_x_{i}"] = Cpt(Signal, value=0)
# Generate yval_y_0 to yval_y_10
for i in range(11):
locals()[f"yval_y_{i}"] = Cpt(Signal, value=0)
# Generate stage_pos_x_1 to stage_pos_x_5
for i in range(1, 6):
locals()[f"stage_pos_x_{i}"] = Cpt(Signal, value=0)

View File

@@ -0,0 +1,103 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxCSAXS(PandaBox):
"""
PandaBox integration for cSAXS. This class implements cSAXS specific logic for the PandaBox integration.
TODO: This logic is not yet mapped to any existing hardware. Adapt Docstring once the hardware is defined and integrated.
"""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxCSAXS(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -0,0 +1,99 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxOMNY(PandaBox):
"""PandaBox integration for OMNY. This class implements OMNY specific logic for the PandaBox integration."""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxOMNY(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -169,6 +169,9 @@ class LamNIMixin:
self.device_manager.devices.lsamx.read_only = True
self.device_manager.devices.lsamy.read_only = True
#update angle readback before start of the scan
yield from self.stubs.send_rpc_and_wait("lsamrot", "readback.get")
yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset")
@@ -210,7 +213,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs):
"""
A LamNI scan following Fermat's spiral.
@@ -230,10 +233,10 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
Examples:
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, **kwargs)
super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs)
self.axis = []
scan_kwargs = parameter.get("kwargs", {})
self.fov_size = scan_kwargs.get("fov_size")
@@ -482,6 +485,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -52,6 +52,7 @@ class FlomniFermatScan(SyncFlyScanBase):
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
frames_per_trigger:int=1,
**kwargs,
):
"""
@@ -62,7 +63,8 @@ class FlomniFermatScan(SyncFlyScanBase):
fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um
cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time
exp_time(float) [s]: exposure time per burst frame
frames_per_trigger(int) : Number of burst frames per point
step(float) [um]: stepsize
zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first)
@@ -71,10 +73,10 @@ class FlomniFermatScan(SyncFlyScanBase):
Returns:
Examples:
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, exp_time=exp_time, **kwargs)
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.show_live_table = False
self.axis = []
self.fovx = fovx
@@ -323,6 +325,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stage()
yield from self.run_baseline_reading()
yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -1,4 +1,4 @@
""" Module with JungfrauJochTestScan class. """
"""Module with JungfrauJochTestScan class."""
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion

View File

@@ -51,6 +51,7 @@ class OMNYFermatScan(SyncFlyScanBase):
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
frames_per_trigger:int=1,
**kwargs,
):
"""
@@ -62,6 +63,7 @@ class OMNYFermatScan(SyncFlyScanBase):
cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time
frames_per_trigger:int: Number of burst frames per trigger, defaults to 1.
step(float) [um]: stepsize
zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first)
@@ -73,7 +75,7 @@ class OMNYFermatScan(SyncFlyScanBase):
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01)
"""
super().__init__(parameter=parameter, **kwargs)
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.axis = []
self.fovx = fovx
self.fovy = fovy
@@ -299,6 +301,7 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stage()
yield from self.run_baseline_reading()
yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -108,12 +108,17 @@ The nano-positioning is controlled by a feedback loop running on a real-time lin
Once the loop has started, it is possible to start bec with the flOMNI configuration file.
Starting bec with session will load the scripts
`bec --session flomni`
The flOMNI scripts can be loaded manually by
`from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni`
`flomni = Flomni(bec)`
Loading the flOMNI configuration (this command will load the OMNY configuration only - isolated from the beamline)
`bec.config.update_session_with_file("/bec/csaxs_bec/csaxs_bec/device_configs/flomni_config.yaml")`
Loading the flOMNI scripts
`from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni`
`flomni = Flomni(bec)`
If the realtime system is restarted, bec will lose communication. To restart:
`flomni.rt_off()` … then wait a few seconds
@@ -138,10 +143,14 @@ This script will first verify that the stages are not in an initialized state, a
The positions of the optics stages are stored as stage parameters and are thus linked to the configuration file.
Example: The OSAx “in” position can be reviewed by `dev.fosax.user_parameter`
Update the value by (example "fosax", "in") by `dev.fosax.update_user_parameter({"in":value})`
Important note: if these values are changed, they are not automatically stored to the config file and will only be available in the current session.
`flomni.ffzp_info()` shows info about the available FZPs at the current energy of the beamline. Optional parameter is the photon _energy_ in keV.
Example: `flomni.ffzp_info(6.2)`
Documents about availabe optics can be accessed by
`flomni.flomnigui_docs`
The [laser feedback](user.ptychography.flomni.laser_feedback) will be disabled and fine alignment lost if foptx/y are moved!
Following functions exist to move the optics in and out, with self-explaining naming.
@@ -193,14 +202,15 @@ The basic scan function can be called by `scans.flomni_fermat_scan()` and offers
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x |
| ceny (float) | center position in y |
| exp_time (float) | exposure time |
| exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize |
| zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example:
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps

View File

@@ -102,13 +102,16 @@ The nano-positioning is controlled by a feedback loop running on a real-time lin
Once the loop has started, it is possible to start bec with the LamNI configuration file.
Loading the LamNI configuration (this command will load the LamNI configuration only - isolated from the beamline)
`bec.config.update_session_with_file("/bec/csaxs_bec/csaxs_bec/device_configs/lamni_config.yaml")`
Loading the LamNI scripts is done by starting bec as
`bec --session lamni`
Loading the LamNI scripts
The scripts can alternatively manually be loaded by
`from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI`
`lamni = LamNI(bec)`
Loading the LamNI configuration (this command will load the LamNI configuration only - isolated from the beamline)
`bec.config.update_session_with_file("/bec/csaxs_bec/csaxs_bec/device_configs/lamni_config.yaml")`
If the realtime system is restarted, BEC will lose communication. To restart:
`lamni.rt_off()` … then wait a 10 seconds
`lamni.rt_on()`
@@ -152,6 +155,12 @@ The underlying scan function can be called as
Use `scans.lamni_fermat_scan?`for detailed information. A prerequisite for scanning is a running feedback system.
### GUI tools
During operation the BEC GUI will show the relevant cameras or progress information. To manually switch view TAB completion on 'lamni.lamnigui_' will show all options to control the GUI. Most useful
'lamni.lamnigui_show_progress()' will show the measurement progress GUI
'lamnigui_show_xeyealign()' will show the XrayEye alignment GUI
### X-ray optics alignment
The positions of the optics stages are stored as stage parameters and are thus linked to the configuration file.

View File

@@ -327,14 +327,15 @@ The basic scan function can be called by `scans.omny_fermat_scan()` and offers a
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x |
| ceny (float) | center position in y |
| exp_time (float) | exposure time |
| exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize |
| zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example:
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps

View File

@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "csaxs_bec"
version = "0.0.0"
description = "The cSAXS plugin repository for BEC"
requires-python = ">=3.10"
requires-python = ">=3.11"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",

View File

@@ -54,7 +54,7 @@ def test_on_connected_sets_mask_and_live_mode(ids_camera):
def test_on_trigger_roi_signal(ids_camera):
"""Test the on_trigger method to ensure it processes the ROI signal correctly."""
ids_camera.live_mode = True
ids_camera.start_live_mode()
test_image = np.array([[2, 4], [6, 8]])
test_mask = np.array([[1, 0], [0, 1]], dtype=np.uint8)
ids_camera.mask = test_mask

View File

@@ -0,0 +1,190 @@
"""Module for testing the PandaBoxCSAXS and PandaBoxOMNY devices."""
# pylint: skip-file
from __future__ import annotations
from unittest import mock
import pytest
from ophyd import Staged
from csaxs_bec.devices.panda_box.panda_box import PandaBoxCSAXS
from csaxs_bec.devices.panda_box.panda_box_omny import PandaBoxOMNY
@pytest.fixture
def panda_omny():
dev_name = "panda_omny"
dev = PandaBoxOMNY(
name=dev_name,
host="omny-panda-box.psi.ch",
signal_alias={
"FMC_IN.VAL1.Min": "cap_voltage_fzp_y_min",
"FMC_IN.VAL1.Max": "cap_voltage_fzp_y_max",
"FMC_IN.VAL1.Mean": "cap_voltage_fzp_y_mean",
"FMC_IN.VAL2.Min": "cap_voltage_fzp_x_min",
"FMC_IN.VAL2.Max": "cap_voltage_fzp_x_max",
"FMC_IN.VAL2.Mean": "cap_voltage_fzp_x_mean",
},
)
yield dev
@pytest.fixture
def panda_csaxs():
dev_name = "panda_csaxs"
dev = PandaBoxCSAXS(name=dev_name, host="csaxs-panda-box.psi.ch")
yield dev
def test_panda_omny(panda_omny):
assert panda_omny.name == "panda_omny"
assert panda_omny.host == "omny-panda-box.psi.ch"
all_signal_names = [name for name, _ in panda_omny.data.signals]
# Check that the signal aliases are correctly set up
assert "cap_voltage_fzp_y_min" in all_signal_names
assert "cap_voltage_fzp_y_max" in all_signal_names
assert "cap_voltage_fzp_y_mean" in all_signal_names
assert "cap_voltage_fzp_x_min" in all_signal_names
assert "cap_voltage_fzp_x_max" in all_signal_names
assert "cap_voltage_fzp_x_mean" in all_signal_names
# Check that the original signal names are not present
assert "FMC_IN.VAL1.Min" not in all_signal_names
assert "FMC_IN.VAL1.Max" not in all_signal_names
assert "FMC_IN.VAL1.Mean" not in all_signal_names
assert "FMC_IN.VAL2.Min" not in all_signal_names
assert "FMC_IN.VAL2.Max" not in all_signal_names
assert "FMC_IN.VAL2.Mean" not in all_signal_names
assert panda_omny._acquisition_group == "burst"
assert panda_omny._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_acquisition_group):
# Check that the stage signal is present and has the correct PV
assert len(panda_omny._status_callbacks) == 0
panda_omny.scan_info.msg.scan_type = scan_type
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_omny.stage()
assert panda_omny._acquisition_group == expected_acquisition_group
assert panda_omny.staged == Staged.yes
def test_panda_omny_complete(panda_omny):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_omny.scan_info.msg.num_points = 1
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_omny._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_omny, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_omny._timeout_on_completed = 5
with (
mock.patch.object(panda_omny, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True
def test_panda_csaxs(panda_csaxs):
assert panda_csaxs.name == "panda_csaxs"
assert panda_csaxs.host == "csaxs-panda-box.psi.ch"
assert panda_csaxs._acquisition_group == "burst"
assert panda_csaxs._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_acquisition_group):
"""Test the on_stage method of the PandaBoxCSAXS device for different scan types and frames per trigger."""
assert len(panda_csaxs._status_callbacks) == 0
panda_csaxs.scan_info.msg.scan_type = scan_type
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_csaxs.stage()
assert panda_csaxs._acquisition_group == expected_acquisition_group
assert panda_csaxs.staged == Staged.yes
def test_panda_csaxs_complete(panda_csaxs):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_csaxs.scan_info.msg.num_points = 1
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_csaxs._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_csaxs._timeout_on_completed = 5
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True

View File

@@ -229,6 +229,22 @@ def device_manager_mock():
"kwargs": {},
},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="lsamrot",
action="rpc",
parameter={
"device": "lsamrot",
"func": "readback.get",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
@@ -302,6 +318,36 @@ def device_manager_mock():
action="set",
parameter={"value": 2.1508313829565293},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="pre_scan",
parameter={},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": 1.3681828686580249},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": 2.1508313829565293},
),
None,
messages.DeviceInstructionMessage(
metadata={