Compare commits

..

25 Commits

Author SHA1 Message Date
4721ec404b fix(mcs): remove info logs
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m58s
CI for csaxs_bec / test (pull_request) Successful in 1m54s
2026-03-04 09:13:55 +01:00
4d69f8f90f fix(bec_widgets): removed omny alignment old gui
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 21:00:14 +01:00
0f072a786e test: add tests for panda, fix tests for fermat scan
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 13:15:53 +01:00
05a1e3d8be refactor: cleanup docs and logs for most relevant devices 2026-03-02 13:15:53 +01:00
e9fd9084b8 refactor(csaxsdlpca200): cleanup docs 2026-03-02 13:15:53 +01:00
40ef387134 fix(panda): make complete asyncronous for PandaBoxOmny 2026-03-02 13:15:53 +01:00
x12sa
6ed84664f2 added docs for burst acquisition 2026-03-02 13:15:53 +01:00
x12sa
e5e3343da7 post startup script sessions for all setups, and required modifications 2026-03-02 13:15:53 +01:00
x12sa
c8866faccc logic for gain setting and readback for bpm amplifiers 2026-03-02 13:15:53 +01:00
x12sa
3b561c251c fix(config): remove panda test config 2026-03-02 13:15:53 +01:00
x12sa
bc187040ad refactor(panda-box): add pandaboxomny and refactor panda_box main integration 2026-03-02 13:15:53 +01:00
x12sa
efd27a27e8 fix(ferma-scan): fix flomni, lamni and omny fermat scans. add exp_time and frames_per_trigger 2026-03-02 13:15:53 +01:00
x12sa
7096ef3323 refactor(config): Update configs for bl_general and flomni 2026-03-02 13:15:53 +01:00
13378f24dd refactor(panda-box): refactor Pandabox, moving logic to base class 2026-03-02 13:15:53 +01:00
x01dc
f5b898ea1c feat: add panda box csaxs integration 2026-03-01 18:15:57 +01:00
3d62bea04b Update repo with template version v1.2.8
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m55s
CI for csaxs_bec / test (push) Successful in 1m54s
2026-02-27 16:25:22 +01:00
1518845d25 resolve merge conflicts 2026-02-27 16:25:22 +01:00
ff3b6686db Update repo with template version v1.2.7 2026-02-27 16:25:22 +01:00
afdc64e296 fix(rio): fix rio cached readings
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m40s
CI for csaxs_bec / test (push) Successful in 1m36s
2026-02-26 16:15:29 +01:00
bc31c00e1f fix(tests): x_ray_eye_align correct imports fixed after refactor of LamNI
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m36s
CI for csaxs_bec / test (push) Successful in 1m37s
2026-02-23 13:25:09 +01:00
x01dc
38671f074e minor printout fix
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m30s
CI for csaxs_bec / test (push) Failing after 1m32s
2026-02-23 12:44:04 +01:00
x01dc
92e39a5f75 minor adjmustment 2026-02-23 12:35:56 +01:00
x01dc
22c48115a4 final refactor step prior testing
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m33s
CI for csaxs_bec / test (push) Failing after 1m30s
2026-02-21 13:30:24 +01:00
2a7448526b new files for refactor
Some checks failed
CI for csaxs_bec / test (push) Failing after 36s
2026-02-21 13:02:35 +01:00
x01dc
a5825307e5 refactor remove previous files
Some checks failed
CI for csaxs_bec / test (push) Failing after 35s
2026-02-21 13:00:48 +01:00
52 changed files with 2960 additions and 2403 deletions

View File

@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.2.2
_commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: csaxs_bec

View File

@@ -28,7 +28,7 @@ on:
description: "Python version to use"
required: false
type: string
default: "3.11"
default: "3.12"
permissions:
pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
@@ -55,13 +67,6 @@ jobs:
- name: Checkout BEC Widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Install dependencies
shell: bash
run: |

View File

@@ -0,0 +1,62 @@
name: Create template upgrade PR for csaxs_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"

View File

@@ -1,20 +0,0 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "csaxs"
target: "csaxs_bec"
branch: $CHILD_PIPELINE_BRANCH
pages:
stage: Deploy
needs: []
variables:
TARGET_BRANCH: $CI_COMMIT_REF_NAME
rules:
- if: "$CI_COMMIT_TAG != null"
variables:
TARGET_BRANCH: $CI_COMMIT_TAG
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/csaxs_bec"'
script:
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/

View File

@@ -1,2 +1,6 @@
from .load_additional_correction import lamni_read_additional_correction
from .x_ray_eye_align import DataDrivenLamNI, LamNI, MagLamNI, XrayEyeAlign
from .alignment import XrayEyeAlign
from .lamni import LamNI
from .lamni_optics_mixin import LamNIInitError, LaMNIInitStages, LamNIOpticsMixin
__all__ = [
"LamNI", "XrayEyeAlign", "LamNIInitError", "LaMNIInitStages", "LamNIOpticsMixin"
]

View File

@@ -0,0 +1,461 @@
import builtins
import time
from collections import defaultdict
import numpy as np
from bec_lib import bec_logger
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class XrayEyeAlign:
# pixel calibration, multiply to get mm
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
def __init__(self, client, lamni) -> None:
self.client = client
self.lamni = lamni
self.device_manager = client.device_manager
self.scans = client.scans
self.alignment_values = defaultdict(list)
self._reset_init_values()
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
# ------------------------------------------------------------------
# Correction reset
# ------------------------------------------------------------------
def reset_correction(self):
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
def reset_correction_2(self):
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
def reset_xray_eye_correction(self):
self.client.delete_global_var("tomo_fit_xray_eye")
# ------------------------------------------------------------------
# FOV offset properties
# ------------------------------------------------------------------
@property
def tomo_fovx_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[0] / 1000
@tomo_fovx_offset.setter
@typechecked
def tomo_fovx_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]])
@property
def tomo_fovy_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[1] / 1000
@tomo_fovy_offset.setter
@typechecked
def tomo_fovy_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000])
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _reset_init_values(self):
self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0]
def _disable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_disable()
def _enable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
umv(self.device_manager.devices.lsamrot, val)
def get_tomo_angle(self):
return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"]
# ------------------------------------------------------------------
# X-ray eye camera control
# ------------------------------------------------------------------
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self):
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
# start live
epics_put("XOMNYI-XEYE-ACQ:0", 1)
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5)
# stop live view
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(1)
print("got new frame")
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
@property
def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
# ------------------------------------------------------------------
# Alignment procedure
# ------------------------------------------------------------------
def align(self):
self._reset_init_values()
self.reset_correction()
self.reset_correction_2()
self._disable_rt_feedback()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
self._enable_rt_feedback()
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-ACQ:0", 0)
self.send_message("please wait...")
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...")
self._disable_rt_feedback()
k = 0
self.lamni.lfzp_in()
self.update_frame()
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.send_message("Submit center value of FZP.")
while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm
val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = [val_x, val_y]
print(
f"Clicked position {k}: x {self.alignment_values[k][0]}, y"
f" {self.alignment_values[k][1]}"
)
if k == 0: # received center value of FZP
self.send_message("please wait ...")
self.lamni.loptics_out()
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
print("Moving sample in, FZP out")
self._disable_rt_feedback()
time.sleep(0.3)
self._enable_rt_feedback()
time.sleep(0.3)
self.update_frame()
self.send_message("Go and find the sample")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
elif k == 1: # received sample center value at samrot 0
msg = (
f"Base shift values from movement are x {self.shift_xy[0]}, y"
f" {self.shift_xy[1]}"
)
print(msg)
logger.info(msg)
self.shift_xy[0] += (
self.alignment_values[0][0] - self.alignment_values[1][0]
) * 1000
self.shift_xy[1] += (
self.alignment_values[1][1] - self.alignment_values[0][1]
) * 1000
print(
"Base shift values from movement and clicked position are x"
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
time.sleep(1)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center and FOV (0 deg)")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.update_fov(k)
elif 1 < k < 10: # received sample center value at samrot 0 ... 315
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45 - 45 / 2)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.update_fov(k)
elif k == 10: # received sample center value at samrot 270, done
self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
self.update_fov(k)
break
k += 1
epics_put("XOMNYI-XEYE-STEP:0", k)
if k < 2:
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0:
self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx
self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
print(
f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}"
)
epics_put("XOMNYI-XEYE-MVY:0", 0)
epics_put("XOMNYI-XEYE-MVX:0", 0)
self.update_frame()
time.sleep(0.2)
self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns,"
f" fovy = {fovy:.0f} microns"
)
print("Use matlab routine to fit the current alignment...")
print(
"This additional shift is applied to the base shift values\n which are x"
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self._disable_rt_feedback()
self.tomo_rotate(0)
print(
"\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running"
" lamni.align.read_xray_eye_correction()\n"
)
self.client.set_global_var("tomo_fov_offset", self.shift_xy)
# ------------------------------------------------------------------
# Alignment output
# ------------------------------------------------------------------
def write_output(self):
import os
with open(
os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w"
) as alignment_values_file:
alignment_values_file.write("angle\thorizontal\tvertical\n")
for k in range(2, 11):
fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000
fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000
print(
f"Writing to file new alignment: number {k}, value x {fovx_offset}, y"
f" {fovy_offset}"
)
alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n")
# ------------------------------------------------------------------
# X-ray eye sinusoidal correction (loaded from MATLAB fit files)
# ------------------------------------------------------------------
def read_xray_eye_correction(self, dir_path=None):
import os
if dir_path is None:
dir_path = os.path.expanduser("~/Data10/specES1/internal/")
tomo_fit_xray_eye = np.zeros((2, 3))
for i, axis in enumerate(["x", "y"]):
for j, coeff in enumerate(["A", "B", "C"]):
with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f:
tomo_fit_xray_eye[i][j] = f.readline()
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
print("New alignment parameters loaded from X-ray eye")
print(
f"X Amplitude {tomo_fit_xray_eye[0][0]}, "
f"X Phase {tomo_fit_xray_eye[0][1]}, "
f"X Offset {tomo_fit_xray_eye[0][2]}, "
f"Y Amplitude {tomo_fit_xray_eye[1][0]}, "
f"Y Phase {tomo_fit_xray_eye[1][1]}, "
f"Y Offset {tomo_fit_xray_eye[1][2]}"
)
def lamni_compute_additional_correction_xeye_mu(self, angle):
"""Compute sinusoidal correction from the X-ray eye fit for the given angle."""
tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
if tomo_fit_xray_eye is None:
print("Not applying any additional correction. No x-ray eye data available.\n")
return (0, 0)
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
correction_x = (
tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1])
+ tomo_fit_xray_eye[0][2]
) / 1000
correction_y = (
tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1])
+ tomo_fit_xray_eye[1][2]
) / 1000
print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
return (correction_x, correction_y)
# ------------------------------------------------------------------
# Additional lookup-table corrections (iteration 1 and 2)
# ------------------------------------------------------------------
def read_additional_correction(self, correction_file: str):
self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy(
correction_file
)
def read_additional_correction_2(self, correction_file: str):
self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy(
correction_file
)
def _read_correction_file_xy(self, correction_file: str):
"""Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries."""
with open(correction_file, "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(float(value) / 1000)
elif name == "corr_pos_y":
corr_pos_y.append(float(value) / 1000)
elif name == "corr_angle":
corr_angle.append(float(value))
return corr_pos_x, corr_pos_y, corr_angle
def compute_additional_correction(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1"
)
def compute_additional_correction_2(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2"
)
def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""):
"""Find the correction for the closest angle in the lookup table."""
if not corr_pos_x:
print(f"Not applying additional correction {label}. No data available.\n")
return (0, 0)
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
angledelta = np.fabs(corr_angle[0] - angle)
for j in range(1, len(corr_pos_x)):
newangledelta = np.fabs(corr_angle[j] - angle)
if newangledelta < angledelta:
shift_x = corr_pos_x[j]
shift_y = corr_pos_y[j]
angledelta = newangledelta
if shift_x == 0 and angle < corr_angle[0]:
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
if shift_x == 0 and angle > corr_angle[-1]:
shift_x = corr_pos_x[-1]
shift_y = corr_pos_y[-1]
print(f"Additional correction shifts {label}: {shift_x} {shift_y}")
return (shift_x, shift_y)

View File

@@ -0,0 +1,211 @@
"""
extra_tomo.py
=============
Specialist LamNI subclasses for specific experimental configurations.
Import explicitly when needed, e.g.:
from csaxs_bec...extra_tomo import MagLamNI
from csaxs_bec...extra_tomo import DataDrivenLamNI
"""
import builtins
import datetime
import os
import time
import h5py
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from .lamni import LamNI
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
scans = builtins.__dict__.get("scans")
class MagLamNI(LamNI):
"""LamNI subclass for magnetic experiments (XMCD).
Adds a slow rotation helper and allows injection of a custom
per-angle callback via the ``lamni_at_each_angle`` builtin.
"""
def sub_tomo_scan(self, subtomo_number, start_angle=None):
super().sub_tomo_scan(subtomo_number, start_angle)
# self.rotate_slowly(0)
def rotate_slowly(self, angle, step_size=20):
"""Rotate to target angle in small steps to avoid mechanical stress."""
current_angle = dev.lsamrot.read(cached=True)["value"]
steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1
for target_angle in np.linspace(current_angle, angle, steps, endpoint=True):
umv(dev.lsamrot, target_angle)
scans.lamni_move_to_scan_center(
self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle
)
def _at_each_angle(self, angle: float) -> None:
if "lamni_at_each_angle" in builtins.__dict__:
# pylint: disable=undefined-variable
lamni_at_each_angle(self, angle)
return
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
class DataDrivenLamNI(LamNI):
"""LamNI subclass that reads per-projection scan parameters from an HDF5 file.
Instead of a fixed FOV and step size for the whole tomogram, each
projection can have individual values for step size, loptz position
and lateral shifts, as specified in a datadriven_params.h5 file.
"""
def __init__(self, client):
super().__init__(client)
self.tomo_data = {}
def tomo_scan(
self,
subtomo_start=1,
start_index=None,
fname="~/Data10/data_driven_config/datadriven_params.h5",
):
"""Start a data-driven tomo scan.
Args:
subtomo_start (int): Unused; kept for API compatibility. Use start_index to resume.
start_index (int, optional): Skip projections before this index. Defaults to None.
fname (str): Path to the HDF5 parameter file. Defaults to the standard location.
"""
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
fname = os.path.expanduser(fname)
if not os.path.exists(fname):
raise FileNotFoundError(f"Could not find datadriven params file in {fname}.")
content = f"Loading tomo parameters from {fname}."
logger.warning(content)
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(["Data_driven_file", "BEC"])
self.client.logbook.send_logbook_message(msg)
self._update_tomo_data_from_file(fname)
self._current_special_angles = self.special_angles.copy()
if subtomo_start == 1 and start_index is None:
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
with scans.dataset_id_on_hold:
self.sub_tomo_data_driven(start_index)
def sub_tomo_scan(self, subtomo_number=None, start_angle=None):
raise NotImplementedError(
"Cannot run sub_tomo_scan with DataDrivenLamNI. "
"Use lamni.tomo_scan(start_index=<N>) to resume instead."
)
def _at_each_angle(
self, angle=None, stepsize=None, loptz_pos=None, manual_shift_x=0, manual_shift_y=0
):
self.manual_shift_x = manual_shift_x
self.manual_shift_y = manual_shift_y
self.tomo_shellstep = stepsize
if loptz_pos is not None:
dev.rtx.controller.feedback_disable()
umv(dev.loptz, loptz_pos)
super()._at_each_angle(angle=angle)
def sub_tomo_data_driven(self, start_index=None):
"""Iterate over all projections defined in the loaded HDF5 parameter file."""
for scan_index, scan_data in enumerate(zip(*self.tomo_data.values())):
if start_index and scan_index < start_index:
continue
(
angle,
stepsize,
loptz_pos,
propagation_distance,
manual_shift_x,
manual_shift_y,
subtomo_number,
) = scan_data
bec.metadata.update(
{key: float(val) for key, val in zip(self.tomo_data.keys(), scan_data)}
)
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle}")
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(
float(angle),
stepsize=float(stepsize),
loptz_pos=float(loptz_pos),
manual_shift_x=float(manual_shift_x),
manual_shift_y=float(manual_shift_y),
)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
def _update_tomo_data_from_file(self, fname: str) -> None:
"""Load projection parameters from the HDF5 file into self.tomo_data."""
with h5py.File(fname, "r") as file:
self.tomo_data["theta"] = np.array([*file["theta"]]).flatten()
self.tomo_data["stepsize"] = np.array([*file["stepsize"]]).flatten()
self.tomo_data["loptz"] = np.array([*file["loptz"]]).flatten()
self.tomo_data["propagation_distance"] = np.array(
[*file["relative_propagation_distance"]]
).flatten()
self.tomo_data["manual_shift_x"] = np.array([*file["manual_shift_x"]]).flatten()
self.tomo_data["manual_shift_y"] = np.array([*file["manual_shift_y"]]).flatten()
self.tomo_data["subtomo_id"] = np.array([*file["subtomo_id"]]).flatten()
shapes = [data.shape for data in self.tomo_data.values()]
if len(set(shapes)) > 1:
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")

File diff suppressed because it is too large Load Diff

View File

@@ -8,18 +8,20 @@ from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
# import builtins to avoid linter errors
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class LamNIInitError(Exception):
pass
class LaMNIInitStagesMixin:
class LaMNIInitStages:
"""Handles hardware initialization and referencing of LamNI stages."""
def __init__(self, client):
super().__init__()
self.client = client
@@ -28,14 +30,11 @@ class LaMNIInitStagesMixin:
def lamni_init_stages(self):
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
print("staring...")
print("starting...")
dev.lsamrot.enabled = True
else:
return
if self.check_all_axes_of_lamni_referenced():
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
@@ -44,7 +43,6 @@ class LaMNIInitStagesMixin:
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
print("starting...")
else:
@@ -56,7 +54,9 @@ class LaMNIInitStagesMixin:
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
if self.OMNYTools.yesno("Init of loptz. Can the stage move to the upstream limit without collision?"):
if self.OMNYTools.yesno(
"Init of loptz. Can the stage move to the upstream limit without collision?"
):
print("ok then...")
else:
return
@@ -81,13 +81,14 @@ class LaMNIInitStagesMixin:
self.drive_axis_to_limit(dev.lsamy, "reverse")
self.find_reference_mark(dev.lsamy)
# the dual encoder requires the reference mark to pass on both encoders
print("Referencing lsamrot")
self.drive_axis_to_limit(dev.lsamrot, "reverse")
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
if self.OMNYTools.yesno("Init of leye. Can the stage move to -x limit without collision?"):
if self.OMNYTools.yesno(
"Init of leye. Can the stage move to -x limit without collision?"
):
print("starting...")
else:
return
@@ -97,15 +98,6 @@ class LaMNIInitStagesMixin:
print("Referencing leyey")
self.drive_axis_to_limit(dev.leyey, "forward")
# set_lm lsamx 6 14
# set_lm lsamy 6 14
# set_lm lsamrot -3 362
# set_lm loptx -1 -0.2
# set_lm lopty 3.0 3.6
# set_lm loptz 82 87
# set_lm leyex 0 25
# set_lm leyey 0.5 50
print("Init of Smaract stages")
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
time.sleep(1)
@@ -113,15 +105,6 @@ class LaMNIInitStagesMixin:
time.sleep(1)
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(1)
# dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
# time.sleep(1)
# dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
# time.sleep(1)
# set_lm losax -1.5 0.25
# set_lm losay -2.5 4.1
# set_lm losaz -4.1 -0.5
# set_lm lcsy -1.5 5
self._align_setup()
@@ -178,6 +161,8 @@ class LaMNIInitStagesMixin:
class LamNIOpticsMixin:
"""Optics movement methods: FZP, OSA, central stop and X-ray eye."""
@staticmethod
def _get_user_param_safe(device, var):
param = dev[device].user_parameter
@@ -192,13 +177,11 @@ class LamNIOpticsMixin:
umv(dev.leyey, leyey_out)
epics_put("XOMNYI-XEYE-ACQ:0", 2)
# move rotation stage to zero to avoid problems with wires
umv(dev.lsamrot, 0)
umv(dev.dttrz, 5854, dev.fttrz, 2395)
def leye_in(self):
bec.queue.next_dataset_number += 1
# move rotation stage to zero to avoid problems with wires
umv(dev.lsamrot, 0)
umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979)
while True:
@@ -215,15 +198,10 @@ class LamNIOpticsMixin:
def _lfzp_in(self):
loptx_in = self._get_user_param_safe("loptx", "in")
lopty_in = self._get_user_param_safe("lopty", "in")
umv(
dev.loptx, loptx_in, dev.lopty, lopty_in
) # for 7.2567 keV and 150 mu, 60 nm fzp, loptz 83.6000 for propagation 1.4 mm
umv(dev.loptx, loptx_in, dev.lopty, lopty_in)
def lfzp_in(self):
"""
move in the lamni zone plate.
This will disable rt feedback, move the FZP and re-enabled the feedback.
"""
"""Move in the LamNI zone plate, disabling/re-enabling RT feedback around the move."""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
@@ -233,18 +211,15 @@ class LamNIOpticsMixin:
dev.rtx.controller.feedback_enable_with_reset()
def loptics_in(self):
"""
Move in the lamni optics, including the FZP and the OSA.
"""
"""Move in the LamNI optics (FZP + OSA)."""
self.lfzp_in()
self.losa_in()
def loptics_out(self):
"""Move out the lamni optics"""
"""Move out the LamNI optics."""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
# self.lcs_out()
self.losa_out()
loptx_out = self._get_user_param_safe("loptx", "out")
lopty_out = self._get_user_param_safe("lopty", "out")
@@ -255,28 +230,17 @@ class LamNIOpticsMixin:
dev.rtx.controller.feedback_enable_with_reset()
def lcs_in(self):
# umv lcsx -1.852 lcsy -0.095
pass
def lcs_out(self):
umv(dev.lcsy, 3)
def losa_in(self):
# 6.2 keV, 170 um FZP
# umv(dev.losax, -1.4450000, dev.losay, -0.1800)
# umv(dev.losaz, -1)
# 6.7, 170
# umv(dev.losax, -1.4850, dev.losay, -0.1930)
# umv(dev.losaz, 1.0000)
# 7.2, 150
losax_in = self._get_user_param_safe("losax", "in")
losay_in = self._get_user_param_safe("losay", "in")
losaz_in = self._get_user_param_safe("losaz", "in")
umv(dev.losax, losax_in, dev.losay, losay_in)
umv(dev.losaz, losaz_in)
# 11 kev
# umv(dev.losax, -1.161000, dev.losay, -0.196)
# umv(dev.losaz, 1.0000)
def losa_out(self):
losay_out = self._get_user_param_safe("losay", "out")
@@ -285,11 +249,10 @@ class LamNIOpticsMixin:
umv(dev.losay, losay_out)
def lfzp_info(self, mokev_val=-1):
if mokev_val == -1:
try:
mokev_val = dev.mokev.readback.get()
except:
except Exception:
print(
"Device mokev does not exist. You can specify the energy in keV as an argument instead."
)
@@ -324,10 +287,6 @@ class LamNIOpticsMixin:
)
console.print(table)
print("OSA Information:")
# print(f"Current losaz %.1f\n", A[losaz])
# print("The OSA will collide with the sample plane at %.1f\n\n", 89.3-A[loptz])
print(
"The numbers presented here are for a sample in the plane of the lamni sample holder.\n"
)

View File

@@ -1,22 +0,0 @@
def lamni_read_additional_correction():
# "additional_correction_shift"
# [0][] x , [1][] y, [2][] angle, [3][0] number of elements
with open("correction_lamni_um_S01405_.txt", "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(value)
elif name == "corr_pos_y":
corr_pos_y.append(value)
elif name == "corr_angle":
corr_angle.append(value)
return (corr_pos_x, corr_pos_y, corr_angle, num_elements)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,442 @@
"""
csaxs_dlpca200.py
=================
BEC control script for FEMTO DLPCA-200 Variable Gain Low Noise Current Amplifiers
connected to Galil RIO digital outputs.
DLPCA-200 Remote Control (datasheet page 4)
-------------------------------------------
Sub-D pin -> function:
Pin 10 -> gain LSB (digital out channel, index 0 in bit-tuple)
Pin 11 -> gain MID (digital out channel, index 1 in bit-tuple)
Pin 12 -> gain MSB (digital out channel, index 2 in bit-tuple)
Pin 13 -> coupling LOW = AC, HIGH = DC
Pin 14 -> speed mode HIGH = low noise (Pin14=1), LOW = high speed (Pin14=0)
Gain truth table (MSB, MID, LSB):
0,0,0 -> low-noise: 1e3 high-speed: 1e5
0,0,1 -> low-noise: 1e4 high-speed: 1e6
0,1,0 -> low-noise: 1e5 high-speed: 1e7
0,1,1 -> low-noise: 1e6 high-speed: 1e8
1,0,0 -> low-noise: 1e7 high-speed: 1e9
1,0,1 -> low-noise: 1e8 high-speed: 1e10
1,1,0 -> low-noise: 1e9 high-speed: 1e11
Strategy: prefer low-noise mode (1e3-1e9). For 1e10 and 1e11,
automatically fall back to high-speed mode.
Device wiring example (galilrioesxbox):
bpm4: Pin10->ch0, Pin11->ch1, Pin12->ch2, Pin13->ch3, Pin14->ch4
bim: Pin10->ch6, Pin11->ch7, Pin12->ch8, Pin13->ch9, Pin14->ch10
Usage examples
--------------
csaxs_amp = cSAXSDLPCA200(client)
csaxs_amp.set_gain("bpm4", 1e7) # low-noise if possible
csaxs_amp.set_gain("bim", 1e10) # auto high-speed
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
csaxs_amp.info("bpm4") # print current settings
csaxs_amp.info_all() # print all configured amplifiers
"""
import builtins
from bec_lib import bec_logger
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
# ---------------------------------------------------------------------------
# Amplifier registry
# ---------------------------------------------------------------------------
# Each entry describes one DLPCA-200 amplifier connected to a Galil RIO.
#
# Keys inside "channels":
# gain_lsb -> digital output channel number wired to DLPCA-200 Pin 10
# gain_mid -> digital output channel number wired to DLPCA-200 Pin 11
# gain_msb -> digital output channel number wired to DLPCA-200 Pin 12
# coupling -> digital output channel number wired to DLPCA-200 Pin 13
# speed_mode -> digital output channel number wired to DLPCA-200 Pin 14
#
# To add a new amplifier, simply extend this dict.
# ---------------------------------------------------------------------------
DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"bpm4": {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3
"speed_mode": 4, # Pin 14 -> Galil ch4
},
},
"bim": {
"rio_device": "galilrioesxbox",
"description": "Beam Intensity Monitor current amplifier",
"channels": {
"gain_lsb": 6, # Pin 10 -> Galil ch6
"gain_mid": 7, # Pin 11 -> Galil ch7
"gain_msb": 8, # Pin 12 -> Galil ch8
"coupling": 9, # Pin 13 -> Galil ch9
"speed_mode": 10, # Pin 14 -> Galil ch10
},
},
}
# ---------------------------------------------------------------------------
# DLPCA-200 gain encoding tables
# ---------------------------------------------------------------------------
# (msb, mid, lsb) -> gain in V/A
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
# Inverse maps: gain -> (msb, mid, lsb, low_noise_flag)
# low_noise_flag: True = Pin14 HIGH, False = Pin14 LOW
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class cSAXSDLPCA200Error(Exception):
pass
class cSAXSDLPCA200:
"""
Control class for FEMTO DLPCA-200 current amplifiers connected via Galil RIO
digital outputs in a BEC environment.
Supports:
- Forward control: set_gain(), set_coupling()
- Readback reporting: info(), info_all(), read_settings()
- Robust error handling and logging following cSAXS conventions.
"""
TAG = "[DLPCA200]"
def __init__(self, client, config: dict | None = None) -> None:
"""
Parameters
----------
client : BEC client object (passed through for future use)
config : optional override for DLPCA200_AMPLIFIER_CONFIG.
Falls back to the module-level dict if not provided.
"""
self.client = client
self._config: dict[str, dict] = config if config is not None else DLPCA200_AMPLIFIER_CONFIG
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _require_dev(self) -> None:
if dev is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} BEC 'dev' namespace is not available in this session."
)
def _get_cfg(self, amp_name: str) -> dict:
"""Return config dict for a named amplifier, raising on unknown names."""
if amp_name not in self._config:
known = ", ".join(sorted(self._config.keys()))
raise cSAXSDLPCA200Error(f"{self.TAG} Unknown amplifier '{amp_name}'. Known: [{known}]")
return self._config[amp_name]
def _get_rio(self, amp_name: str):
"""Return the live RIO device object for a given amplifier."""
self._require_dev()
cfg = self._get_cfg(amp_name)
rio_name = cfg["rio_device"]
try:
rio = getattr(dev, rio_name)
except AttributeError:
raise cSAXSDLPCA200Error(f"{self.TAG} RIO device '{rio_name}' not found in BEC 'dev'.")
return rio
def _dout_get(self, rio, ch: int) -> int:
"""Read one digital output channel (returns 0 or 1)."""
attr = getattr(rio.digital_out, f"ch{ch}")
val = attr.get()
return int(val)
def _dout_set(self, rio, ch: int, value: bool) -> None:
"""Write one digital output channel (True=HIGH=1, False=LOW=0)."""
attr = getattr(rio.digital_out, f"ch{ch}")
attr.set(value)
def _read_gain_bits(self, amp_name: str) -> tuple[int, int, int, int]:
"""
Read current gain bit-state from hardware.
Returns
-------
(msb, mid, lsb, speed_mode)
speed_mode: 1 = low-noise (Pin14=HIGH), 0 = high-speed (Pin14=LOW)
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
return msb, mid, lsb, speed_mode
def _decode_gain(self, msb: int, mid: int, lsb: int, speed_mode: int) -> int | None:
"""
Decode hardware bit-state into gain value (V/A).
speed_mode=1 -> low-noise table, speed_mode=0 -> high-speed table.
Returns None if the bit combination is not in the table.
"""
bits = (msb, mid, lsb)
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
# ------------------------------------------------------------------
# Public API - control
# ------------------------------------------------------------------
def set_gain(self, amp_name: str, gain: float, force_high_speed: bool = False) -> None:
"""
Set the transimpedance gain of a DLPCA-200 amplifier.
The method automatically selects low-noise mode (Pin14=HIGH) whenever
the requested gain is achievable in low-noise mode (1e3 - 1e9 V/A).
For gains of 1e10 and 1e11 V/A, high-speed mode is used automatically.
Parameters
----------
amp_name : str
Amplifier name as defined in DLPCA200_AMPLIFIER_CONFIG (e.g. "bpm4").
gain : float or int
Target gain in V/A. Must be one of:
1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11.
force_high_speed : bool, optional
If True, force high-speed (low-noise=False) mode even for gains
below 1e10. Default: False (prefer low-noise).
Examples
--------
csaxs_amp.set_gain("bpm4", 1e7) # low-noise mode (automatic)
csaxs_amp.set_gain("bim", 1e10) # high-speed mode (automatic)
csaxs_amp.set_gain("bpm4", 1e7, force_high_speed=True) # override to high-speed
"""
gain_int = int(gain)
if gain_int not in _GAIN_TO_BITS:
valid_str = ", ".join(
f"1e{int(round(__import__('math').log10(g)))}" for g in VALID_GAINS
)
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid gain {gain:.2e} V/A for '{amp_name}'. "
f"Valid values: {valid_str}"
)
msb, mid, lsb, low_noise_preferred = _GAIN_TO_BITS[gain_int]
# Apply force_high_speed override
if force_high_speed and low_noise_preferred:
# Check if this gain is achievable in high-speed mode
hs_entry = next(
(bits for bits, g in _GAIN_BITS_HIGH_SPEED.items() if g == gain_int), None
)
if hs_entry is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} Gain {gain:.2e} V/A is not achievable in high-speed mode "
f"for '{amp_name}'."
)
msb, mid, lsb = hs_entry
low_noise_preferred = False
use_low_noise = low_noise_preferred and not force_high_speed
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["gain_msb"], bool(msb))
self._dout_set(rio, ch["gain_mid"], bool(mid))
self._dout_set(rio, ch["gain_lsb"], bool(lsb))
self._dout_set(rio, ch["speed_mode"], use_low_noise) # True=low-noise
mode_str = "low-noise" if use_low_noise else "high-speed"
logger.info(
f"{self.TAG} [{amp_name}] gain set to {gain_int:.2e} V/A "
f"({mode_str} mode, bits MSB={msb} MID={mid} LSB={lsb})"
)
print(
f"{amp_name}: gain -> {gain_int:.2e} V/A [{mode_str}] "
f"(bits: MSB={msb} MID={mid} LSB={lsb})"
)
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set gain on '{amp_name}': {exc}"
) from exc
def set_coupling(self, amp_name: str, coupling: str) -> None:
"""
Set AC or DC coupling on a DLPCA-200 amplifier.
Parameters
----------
amp_name : str
Amplifier name (e.g. "bpm4", "bim").
coupling : str
"AC" or "DC" (case-insensitive).
DC -> Pin13 HIGH, AC -> Pin13 LOW.
Examples
--------
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
"""
coupling_upper = coupling.strip().upper()
if coupling_upper not in ("AC", "DC"):
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid coupling '{coupling}' for '{amp_name}'. " f"Use 'AC' or 'DC'."
)
pin13_high = coupling_upper == "DC"
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["coupling"], pin13_high)
logger.info(f"{self.TAG} [{amp_name}] coupling set to {coupling_upper}")
print(f"{amp_name}: coupling -> {coupling_upper}")
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set coupling on '{amp_name}': {exc}"
) from exc
# ------------------------------------------------------------------
# Public API - readback / reporting
# ------------------------------------------------------------------
def read_settings(self, amp_name: str) -> dict:
"""
Read back the current settings from hardware digital outputs.
Returns
-------
dict with keys:
"amp_name" : str
"gain" : int or None - gain in V/A (None if unknown bit pattern)
"mode" : str - "low-noise" or "high-speed"
"coupling" : str - "AC" or "DC"
"bits" : dict - raw bit values {msb, mid, lsb, speed_mode, coupling}
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
coupling_bit = self._dout_get(rio, ch["coupling"])
gain = self._decode_gain(msb, mid, lsb, speed_mode)
mode = "low-noise" if speed_mode else "high-speed"
coupling = "DC" if coupling_bit else "AC"
return {
"amp_name": amp_name,
"gain": gain,
"mode": mode,
"coupling": coupling,
"bits": {
"msb": msb,
"mid": mid,
"lsb": lsb,
"speed_mode": speed_mode,
"coupling": coupling_bit,
},
}
def info(self, amp_name: str) -> None:
"""
Print a plain summary of the current settings for one amplifier.
Example output
--------------
Amplifier : bpm4
Description : Beam Position Monitor 4 current amplifier
RIO device : galilrioesxbox
Gain : 1.00e+07 V/A
Mode : low-noise
Coupling : DC
Raw bits : MSB=1 MID=0 LSB=0 speed=1 coup=1
"""
cfg = self._get_cfg(amp_name)
try:
s = self.read_settings(amp_name)
except Exception as exc:
print(f"{self.TAG} [{amp_name}] Could not read settings: {exc}")
return
gain_str = (
f"{s['gain']:.2e} V/A" if s["gain"] is not None else "UNKNOWN (invalid bit pattern)"
)
bits = s["bits"]
print(f" {'Amplifier':<12}: {amp_name}")
print(f" {'Description':<12}: {cfg.get('description', '')}")
print(f" {'RIO device':<12}: {cfg['rio_device']}")
print(f" {'Gain':<12}: {gain_str}")
print(f" {'Mode':<12}: {s['mode']}")
print(f" {'Coupling':<12}: {s['coupling']}")
print(
f" {'Raw bits':<12}: MSB={bits['msb']} MID={bits['mid']} LSB={bits['lsb']} speed={bits['speed_mode']} coup={bits['coupling']}"
)
def info_all(self) -> None:
"""
Print a plain summary for ALL configured amplifiers.
"""
print("\nDLPCA-200 Amplifier Status Report")
print("-" * 40)
for amp_name in sorted(self._config.keys()):
self.info(amp_name)
print()
def list_amplifiers(self) -> list[str]:
"""Return sorted list of configured amplifier names."""
return sorted(self._config.keys())

View File

@@ -41,8 +41,10 @@ import builtins
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSFilterTransmission:
"""

View File

@@ -8,11 +8,14 @@ from bec_lib import bec_logger
logger = bec_logger.logger
# Pull BEC globals if present
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSInitSmaractStagesError(Exception):
pass
@@ -383,7 +386,6 @@ class cSAXSInitSmaractStages:
if not self._yesno("Proceed with the motions listed above?", "y"):
logger.info("[cSAXS] Motion to initial position aborted by user.")
return
# --- Execution phase (SIMULTANEOUS MOTION) ---
if umv is None:
logger.error("[cSAXS] 'umv' is not available in this session.")

View File

@@ -22,8 +22,10 @@ logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class FlomniToolsError(Exception):

View File

@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class flomniGuiToolsError(Exception):

View File

@@ -13,8 +13,10 @@ logger = bec_logger.logger
# import builtins to avoid linter errors
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING:
from bec_ipython_client.plugins.flomni import Flomni

View File

@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYGuiToolsError(Exception):

View File

@@ -27,9 +27,10 @@ logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYInitError(Exception):
pass

View File

@@ -16,8 +16,10 @@ from rich.table import Table
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYToolsError(Exception):

View File

@@ -16,8 +16,10 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fsh
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYTransferError(Exception):

View File

@@ -13,8 +13,10 @@ logger = bec_logger.logger
# import builtins to avoid linter errors
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING:
from bec_ipython_client.plugins.omny import OMNY

View File

@@ -30,29 +30,74 @@ logger = bec_logger.logger
logger.info("Using the cSAXS startup script.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
elif _args.session.lower() == "csaxs":
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
logger.success("cSAXS session loaded.")
from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools
debug = DebugTools()
logger.success("Debug tools loaded. Use 'debug' to access them.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS.cSAXS import cSAXS
csaxs = cSAXS(bec)
logger.success("cSAXS session loaded.")
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C L a m N I
""")
elif _args.session.lower() == "omny":
from csaxs_bec.bec_ipython_client.plugins.flomni import OMNY
_session_name = "OMNY"
omny = OMNY(bec)
logger.success("OMNY session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗
██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝
██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝
██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝
██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝
B E C O M N Y
""")
elif _args.session.lower() == "flomni":
from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni
_session_name = "flomni"
flomni = Flomni(bec)
logger.success("flomni session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C f l O M N I
""")
# SETUP BEAMLINE INFO
from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo

View File

@@ -13,69 +13,10 @@ logger = bec_logger.logger
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye",
}
class OmnyAlignment(RPCBase):
@property
@rpc_call
def enable_live_view(self):
"""
None
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
None
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""
class XRayEye(RPCBase):
@rpc_call
def active_roi(self) -> "BaseROI | None":
@@ -83,20 +24,6 @@ class XRayEye(RPCBase):
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):

View File

@@ -1,140 +0,0 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
return
image.disconnect_monitor("cam_xeye")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['omny_alignment.py']}

View File

@@ -1,125 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -33,46 +33,48 @@ class XRayEye2DControl(BECWidget, QWidget):
self.get_bec_shortcuts()
self._step_size = step_size
self.root_layout = QGridLayout(self)
self.setStyleSheet("""
self.setStyleSheet(
"""
QToolButton {
border: 1px solid;
border-radius: 4px;
}
""")
"""
)
# Up
self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.move_up_button.setIcon(material_icon("keyboard_double_arrow_up"))
self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button
self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.move_up_tweak_button.setIcon(material_icon("keyboard_arrow_up"))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left
self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.move_left_button.setIcon(material_icon("keyboard_double_arrow_left"))
self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button
self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.move_left_tweak_button.setIcon(material_icon("keyboard_arrow_left"))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right
self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.move_right_button.setIcon(material_icon("keyboard_double_arrow_right"))
self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button
self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.move_right_tweak_button.setIcon(material_icon("keyboard_arrow_right"))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down
self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.move_down_button.setIcon(material_icon("keyboard_double_arrow_down"))
self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button
self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.move_down_tweak_button.setIcon(material_icon("keyboard_arrow_down"))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections
@@ -124,8 +126,15 @@ class XRayEye2DControl(BECWidget, QWidget):
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
USER_ACCESS = [
"active_roi",
"user_message",
"user_message.setter",
"sample_name",
"sample_name.setter",
"enable_move_buttons",
"enable_move_buttons.setter",
]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
@@ -136,7 +145,9 @@ class XRayEye(BECWidget, QWidget):
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.bec_dispatcher.connect_slot(
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
)
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
@@ -145,7 +156,9 @@ class XRayEye(BECWidget, QWidget):
self.core_layout = QHBoxLayout(self)
self.image = Image(parent=self)
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image.enable_toolbar = (
False # Disable default toolbar to not allow to user set anything
)
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
@@ -156,8 +169,9 @@ class XRayEye(BECWidget, QWidget):
self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
compact_orientation="horizontal")
self.roi_manager = ROIPropertyTree(
parent=self, image_widget=self.image, compact=True, compact_orientation="horizontal"
)
header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8)
@@ -230,7 +244,9 @@ class XRayEye(BECWidget, QWidget):
# Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
self.step_size.valueChanged.connect(
lambda x: self.motor_control_2d.setProperty("step_size", x)
)
self.submit_button.clicked.connect(self.submit)
def _create_separator(self):
@@ -248,12 +264,14 @@ class XRayEye(BECWidget, QWidget):
################################################################################
def connect_motors(self):
""" Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
"""Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ["osamroy", "lsamrot", "fsamroy"]
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
self.bec_dispatcher.connect_slot(
self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor)
)
logger.info(f"Succesfully connected to {motor}")
################################################################################
@@ -341,7 +359,7 @@ class XRayEye(BECWidget, QWidget):
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
# TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@@ -355,25 +373,25 @@ class XRayEye(BECWidget, QWidget):
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
signals = data.get("signals")
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get("value")
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get("value")
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get("value")
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
user_message = signals.get("omny_xray_gui_send_message").get("value")
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
sample_message = signals.get("omny_xray_gui_sample_name").get("value")
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get("value")
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
enable_submit_button = signals.get("omny_xray_gui_submit").get("value")
self.enable_submit_button(enable_submit_button)
@SafeSlot()
@@ -383,35 +401,40 @@ class XRayEye(BECWidget, QWidget):
logger.warning("No active ROI")
return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates['center_y']
roi_center_x = roi_coordinates["center_x"]
roi_center_y = roi_coordinates["center_y"]
# Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates['width']
roi_height = roi_coordinates['height']
roi_width = roi_coordinates["width"]
roi_height = roi_coordinates["height"]
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates['radius']
roi_width = roi_coordinates["diameter"]
roi_height = roi_coordinates["radius"]
else:
logger.warning("Unsupported ROI type for submit action.")
return
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
print(
f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}"
) # TODO remove when will be not needed for debugging
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get("value"))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
getattr(self.dev,CAMERA[0]).live_mode = False
self.bec_dispatcher.disconnect_slot(
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
)
getattr(self.dev, CAMERA[0]).live_mode = False
super().cleanup()
if __name__ == "__main__":
import sys

View File

@@ -9,27 +9,27 @@ eiger_1_5:
readoutPriority: async
softwareTrigger: False
eiger_9:
description: Eiger 9M detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False
# eiger_9:
# description: Eiger 9M detector
# deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
# deviceConfig:
# detector_distance: 100
# beam_center: [0, 0]
# onFailure: raise
# enabled: true
# readoutPriority: async
# softwareTrigger: False
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
deviceConfig:
camera_id: 201
bits_per_pixel: 24
m_n_colormode: 1
live_mode: True
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: True
# ids_cam:
# description: IDS camera for live image acquisition
# deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
# deviceConfig:
# camera_id: 201
# bits_per_pixel: 24
# m_n_colormode: 1
# live_mode: True
# onFailure: raise
# enabled: true
# readoutPriority: async
# softwareTrigger: True

View File

@@ -0,0 +1,25 @@
############################################################
##################### EPS ##################################
############################################################
x12saEPS:
description: X12SA EPS info and control
deviceClass: csaxs_bec.devices.epics.eps.EPS
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
##################### GalilRIO #############################
############################################################
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesxbox.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -1,11 +1,11 @@
# This is the main configuration file that is
# commented or uncommented according to the type of experiment
optics:
- !include ./bl_optics_hutch.yaml
# optics:
# - !include ./bl_optics_hutch.yaml
frontend:
- !include ./bl_frontend.yaml
# frontend:
# - !include ./bl_frontend.yaml
endstation:
- !include ./bl_endstation.yaml
@@ -16,8 +16,8 @@ detectors:
#sastt:
# - !include ./sastt.yaml
#flomni:
# - !include ./ptycho_flomni.yaml
flomni:
- !include ./ptycho_flomni.yaml
#omny:
# - !include ./ptycho_omny.yaml

View File

@@ -471,7 +471,7 @@ omnyfsh:
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui Epics signals
description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceConfig: {}
enabled: true
@@ -486,4 +486,25 @@ calculated_signal:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline
readoutPriority: baseline
############################################################
#################### OMNY Pandabox #########################
############################################################
omny_panda:
readoutPriority: async
deviceClass: csaxs_bec.devices.panda_box.panda_box_omny.PandaBoxOMNY
deviceConfig:
host: omny-panda.psi.ch
signal_alias:
FMC_IN.VAL1.Min: cap_voltage_fzp_y_min
FMC_IN.VAL1.Max: cap_voltage_fzp_y_max
FMC_IN.VAL1.Mean: cap_voltage_fzp_y_mean
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
deviceTags:
- detector
enabled: true
readOnly: false
softwareTrigger: false

View File

@@ -309,7 +309,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay)
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
self.set_delay_pairs(channel="gh", delay=self._shutter_to_open_delay, width=(shutter_width-self._shutter_to_open_delay))
self.set_delay_pairs(
channel="gh",
delay=self._shutter_to_open_delay,
width=(shutter_width - self._shutter_to_open_delay),
)
# Trigger extra pulse for MCS OR gate
# f = e + 1us
@@ -520,7 +524,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
- Return the status object to BEC which will automatically resolve once the status register has
the END_OF_BURST bit set. The callback of the status object will also stop the polling loop.
"""
overall_start = time.time()
self._stop_polling()
# NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source
@@ -559,7 +562,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Send trigger
self.trigger_shot.put(1, use_complete=True)
self.cancel_on_stop(status)
logger.info(f"Configured ddg in {time.time()-overall_start}")
return status
def on_stop(self) -> None:

View File

@@ -261,7 +261,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
"""
with self._rlock:
logger.info(f"Received update on mcs card {self.name}")
if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels
self._mca_counter_index += 1
@@ -293,9 +292,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
)
# Once we have received all channels, push data to BEC and reset for next accumulation
logger.info(
f"Received update for {attr_name}, index {self._mca_counter_index}/{self.NUM_MCA_CHANNELS}"
)
if len(self._current_data) == self.NUM_MCA_CHANNELS:
logger.debug(
f"Current data index {self._current_data_index} complete, pushing to BEC."
@@ -398,11 +394,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# NOTE Make sure that the signal that omits mca callbacks is cleared
self._omit_mca_callbacks.clear()
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
# For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly":
self.erase_start.put(1)
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_prescan(self) -> None | StatusBase:
"""
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure
@@ -446,7 +443,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
while not self._scan_done_thread_kill_event.is_set():
while self._start_monitor_async_data_emission.wait():
try:
logger.debug(f"Monitoring async data emission for {self.name}...")
if (
hasattr(self.scan_info.msg, "num_points")
and self.scan_info.msg.num_points is not None
@@ -456,7 +452,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
for callback in self._scan_done_callbacks:
callback(exception=None)
else:
logger.info(f"Current data index is {self._current_data_index}")
if self._current_data_index >= 1:
for callback in self._scan_done_callbacks:
callback(exception=None)

View File

@@ -132,7 +132,6 @@ class Eiger(PSIDeviceBase):
if data is None:
logger.error(f"Received image message on device {self.name} without data.")
return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
# pylint: disable=missing-function-docstring

View File

@@ -13,6 +13,7 @@ which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS varia
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
@@ -78,12 +79,38 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
"""
_NUM_ANALOG_CHANNELS = 8
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
def __init__(
self,
signal_name: str,
channel: int,
parent: GalilRIO,
readback_timeout: float = None,
**kwargs,
):
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
self._readback_timeout = (
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
)
self._metadata["write_access"] = False
self._last_readback = 0.0
def get(self):
current_time = time.monotonic()
if current_time - self._last_readback > self._readback_timeout:
old_value = self._readback
self._last_readback = current_time # _socket_get may rely on this value to be set.
self._readback = self._socket_get()
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_value,
value=self._readback,
timestamp=current_time,
)
return self._readback
def _socket_set(self, val):
"""Read-only signal, so set method raises an error."""
@@ -136,6 +163,8 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
# Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update
# TODO for now skip running subscribers, this should be re-implemented
# once we properly handle subscriptions from bec running "read"
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
@@ -185,7 +214,7 @@ def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO,
f"ch{i}",
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
{"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
)
return an_channels
@@ -202,12 +231,7 @@ def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal,
f"ch{i}",
{
"kind": Kind.config,
"notify_bec": True,
"channel": i,
"doc": f"Digital output channel {i}.",
},
{"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
)
return di_out_channels

View File

@@ -65,10 +65,8 @@ class RtLamniController(Controller):
"_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer",
"show_interferometer_positions",
"show_analog_signals",
"show_feedback_status",
]
def __init__(

View File

@@ -0,0 +1,103 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxCSAXS(PandaBox):
"""
PandaBox integration for cSAXS. This class implements cSAXS specific logic for the PandaBox integration.
TODO: This logic is not yet mapped to any existing hardware. Adapt Docstring once the hardware is defined and integrated.
"""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxCSAXS(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -0,0 +1,99 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxOMNY(PandaBox):
"""PandaBox integration for OMNY. This class implements OMNY specific logic for the PandaBox integration."""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxOMNY(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -210,7 +210,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs):
"""
A LamNI scan following Fermat's spiral.
@@ -230,10 +230,10 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
Examples:
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, **kwargs)
super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs)
self.axis = []
scan_kwargs = parameter.get("kwargs", {})
self.fov_size = scan_kwargs.get("fov_size")
@@ -482,6 +482,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -52,6 +52,7 @@ class FlomniFermatScan(SyncFlyScanBase):
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
frames_per_trigger:int=1,
**kwargs,
):
"""
@@ -62,7 +63,8 @@ class FlomniFermatScan(SyncFlyScanBase):
fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um
cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time
exp_time(float) [s]: exposure time per burst frame
frames_per_trigger(int) : Number of burst frames per point
step(float) [um]: stepsize
zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first)
@@ -71,10 +73,10 @@ class FlomniFermatScan(SyncFlyScanBase):
Returns:
Examples:
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, exp_time=exp_time, **kwargs)
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.show_live_table = False
self.axis = []
self.fovx = fovx
@@ -323,6 +325,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stage()
yield from self.run_baseline_reading()
yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -1,4 +1,4 @@
""" Module with JungfrauJochTestScan class. """
"""Module with JungfrauJochTestScan class."""
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion

View File

@@ -51,6 +51,7 @@ class OMNYFermatScan(SyncFlyScanBase):
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
frames_per_trigger:int=1,
**kwargs,
):
"""
@@ -62,6 +63,7 @@ class OMNYFermatScan(SyncFlyScanBase):
cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time
frames_per_trigger:int: Number of burst frames per trigger, defaults to 1.
step(float) [um]: stepsize
zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first)
@@ -73,7 +75,7 @@ class OMNYFermatScan(SyncFlyScanBase):
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01)
"""
super().__init__(parameter=parameter, **kwargs)
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.axis = []
self.fovx = fovx
self.fovy = fovy
@@ -299,6 +301,7 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stage()
yield from self.run_baseline_reading()
yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -193,14 +193,15 @@ The basic scan function can be called by `scans.flomni_fermat_scan()` and offers
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x |
| ceny (float) | center position in y |
| exp_time (float) | exposure time |
| exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize |
| zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example:
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps

View File

@@ -327,14 +327,15 @@ The basic scan function can be called by `scans.omny_fermat_scan()` and offers a
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x |
| ceny (float) | center position in y |
| exp_time (float) | exposure time |
| exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize |
| zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example:
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps

View File

@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "csaxs_bec"
version = "0.0.0"
description = "The cSAXS plugin repository for BEC"
requires-python = ">=3.10"
requires-python = ">=3.11"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",

View File

@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
with mock.patch(
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
) as epics_put_mock:
align.save_frame()
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
def test_update_frame(bec_client_mock):
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(
name="xeye",

View File

@@ -0,0 +1,190 @@
"""Module for testing the PandaBoxCSAXS and PandaBoxOMNY devices."""
# pylint: skip-file
from __future__ import annotations
from unittest import mock
import pytest
from ophyd import Staged
from csaxs_bec.devices.panda_box.panda_box import PandaBoxCSAXS
from csaxs_bec.devices.panda_box.panda_box_omny import PandaBoxOMNY
@pytest.fixture
def panda_omny():
dev_name = "panda_omny"
dev = PandaBoxOMNY(
name=dev_name,
host="omny-panda-box.psi.ch",
signal_alias={
"FMC_IN.VAL1.Min": "cap_voltage_fzp_y_min",
"FMC_IN.VAL1.Max": "cap_voltage_fzp_y_max",
"FMC_IN.VAL1.Mean": "cap_voltage_fzp_y_mean",
"FMC_IN.VAL2.Min": "cap_voltage_fzp_x_min",
"FMC_IN.VAL2.Max": "cap_voltage_fzp_x_max",
"FMC_IN.VAL2.Mean": "cap_voltage_fzp_x_mean",
},
)
yield dev
@pytest.fixture
def panda_csaxs():
dev_name = "panda_csaxs"
dev = PandaBoxCSAXS(name=dev_name, host="csaxs-panda-box.psi.ch")
yield dev
def test_panda_omny(panda_omny):
assert panda_omny.name == "panda_omny"
assert panda_omny.host == "omny-panda-box.psi.ch"
all_signal_names = [name for name, _ in panda_omny.data.signals]
# Check that the signal aliases are correctly set up
assert "cap_voltage_fzp_y_min" in all_signal_names
assert "cap_voltage_fzp_y_max" in all_signal_names
assert "cap_voltage_fzp_y_mean" in all_signal_names
assert "cap_voltage_fzp_x_min" in all_signal_names
assert "cap_voltage_fzp_x_max" in all_signal_names
assert "cap_voltage_fzp_x_mean" in all_signal_names
# Check that the original signal names are not present
assert "FMC_IN.VAL1.Min" not in all_signal_names
assert "FMC_IN.VAL1.Max" not in all_signal_names
assert "FMC_IN.VAL1.Mean" not in all_signal_names
assert "FMC_IN.VAL2.Min" not in all_signal_names
assert "FMC_IN.VAL2.Max" not in all_signal_names
assert "FMC_IN.VAL2.Mean" not in all_signal_names
assert panda_omny._acquisition_group == "burst"
assert panda_omny._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_acquisition_group):
# Check that the stage signal is present and has the correct PV
assert len(panda_omny._status_callbacks) == 0
panda_omny.scan_info.msg.scan_type = scan_type
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_omny.stage()
assert panda_omny._acquisition_group == expected_acquisition_group
assert panda_omny.staged == Staged.yes
def test_panda_omny_complete(panda_omny):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_omny.scan_info.msg.num_points = 1
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_omny._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_omny, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_omny._timeout_on_completed = 5
with (
mock.patch.object(panda_omny, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True
def test_panda_csaxs(panda_csaxs):
assert panda_csaxs.name == "panda_csaxs"
assert panda_csaxs.host == "csaxs-panda-box.psi.ch"
assert panda_csaxs._acquisition_group == "burst"
assert panda_csaxs._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_acquisition_group):
"""Test the on_stage method of the PandaBoxCSAXS device for different scan types and frames per trigger."""
assert len(panda_csaxs._status_callbacks) == 0
panda_csaxs.scan_info.msg.scan_type = scan_type
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_csaxs.stage()
assert panda_csaxs._acquisition_group == expected_acquisition_group
assert panda_csaxs.staged == Staged.yes
def test_panda_csaxs_complete(panda_csaxs):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_csaxs.scan_info.msg.num_points = 1
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_csaxs._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_csaxs._timeout_on_completed = 5
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True

View File

@@ -302,6 +302,36 @@ def device_manager_mock():
action="set",
parameter={"value": 2.1508313829565293},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="pre_scan",
parameter={},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": 1.3681828686580249},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": 2.1508313829565293},
),
None,
messages.DeviceInstructionMessage(
metadata={