Compare commits

...

35 Commits

Author SHA1 Message Date
c7db607aab fix(mcs): omit_mca_callbacks if stop is called.
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
CI for csaxs_bec / test (pull_request) Successful in 1m59s
2026-03-04 13:40:43 +01:00
4721ec404b fix(mcs): remove info logs
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m54s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-04 09:13:55 +01:00
4d69f8f90f fix(bec_widgets): removed omny alignment old gui
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 21:00:14 +01:00
0f072a786e test: add tests for panda, fix tests for fermat scan
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 13:15:53 +01:00
05a1e3d8be refactor: cleanup docs and logs for most relevant devices 2026-03-02 13:15:53 +01:00
e9fd9084b8 refactor(csaxsdlpca200): cleanup docs 2026-03-02 13:15:53 +01:00
40ef387134 fix(panda): make complete asyncronous for PandaBoxOmny 2026-03-02 13:15:53 +01:00
x12sa
6ed84664f2 added docs for burst acquisition 2026-03-02 13:15:53 +01:00
x12sa
e5e3343da7 post startup script sessions for all setups, and required modifications 2026-03-02 13:15:53 +01:00
x12sa
c8866faccc logic for gain setting and readback for bpm amplifiers 2026-03-02 13:15:53 +01:00
x12sa
3b561c251c fix(config): remove panda test config 2026-03-02 13:15:53 +01:00
x12sa
bc187040ad refactor(panda-box): add pandaboxomny and refactor panda_box main integration 2026-03-02 13:15:53 +01:00
x12sa
efd27a27e8 fix(ferma-scan): fix flomni, lamni and omny fermat scans. add exp_time and frames_per_trigger 2026-03-02 13:15:53 +01:00
x12sa
7096ef3323 refactor(config): Update configs for bl_general and flomni 2026-03-02 13:15:53 +01:00
13378f24dd refactor(panda-box): refactor Pandabox, moving logic to base class 2026-03-02 13:15:53 +01:00
x01dc
f5b898ea1c feat: add panda box csaxs integration 2026-03-01 18:15:57 +01:00
3d62bea04b Update repo with template version v1.2.8
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m55s
CI for csaxs_bec / test (push) Successful in 1m54s
2026-02-27 16:25:22 +01:00
1518845d25 resolve merge conflicts 2026-02-27 16:25:22 +01:00
ff3b6686db Update repo with template version v1.2.7 2026-02-27 16:25:22 +01:00
afdc64e296 fix(rio): fix rio cached readings
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m40s
CI for csaxs_bec / test (push) Successful in 1m36s
2026-02-26 16:15:29 +01:00
bc31c00e1f fix(tests): x_ray_eye_align correct imports fixed after refactor of LamNI
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m36s
CI for csaxs_bec / test (push) Successful in 1m37s
2026-02-23 13:25:09 +01:00
x01dc
38671f074e minor printout fix
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m30s
CI for csaxs_bec / test (push) Failing after 1m32s
2026-02-23 12:44:04 +01:00
x01dc
92e39a5f75 minor adjmustment 2026-02-23 12:35:56 +01:00
x01dc
22c48115a4 final refactor step prior testing
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m33s
CI for csaxs_bec / test (push) Failing after 1m30s
2026-02-21 13:30:24 +01:00
2a7448526b new files for refactor
Some checks failed
CI for csaxs_bec / test (push) Failing after 36s
2026-02-21 13:02:35 +01:00
x01dc
a5825307e5 refactor remove previous files
Some checks failed
CI for csaxs_bec / test (push) Failing after 35s
2026-02-21 13:00:48 +01:00
x01dc
54f1f42332 added tomo type 2 and 3, for golden ratio
Some checks failed
CI for csaxs_bec / test (pull_request) Successful in 1m37s
CI for csaxs_bec / test (push) Has been cancelled
2026-02-21 11:52:32 +01:00
x01dc
48df15f35c added rt controller commands to lamni namespace
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m38s
2026-02-21 11:38:34 +01:00
x01dc
6f60bd4b2b first commit, getting lamni running and adding some missing features in controller rt and galil
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m45s
2026-02-20 14:39:22 +01:00
5d97913956 refactor(galil-rio): Improve docstring and general integration
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m31s
CI for csaxs_bec / test (push) Successful in 1m32s
2026-02-16 17:48:35 +01:00
93384b87e0 fix(readback-timeout): fix monkeypatch for readback timeout; closes #140
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m29s
CI for csaxs_bec / test (pull_request) Failing after 1m29s
2026-02-16 17:38:10 +01:00
9a249363fd refactor(galil-rio): fix socket-signal cached readings 2026-02-16 17:38:10 +01:00
f925a7c1db fix(galilsignalbase): fix root access for controller from parent.
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m29s
CI for csaxs_bec / test (pull_request) Failing after 1m33s
2026-02-16 14:57:02 +01:00
5811e445fe tests: fix tests after refactoring 2026-02-16 14:57:02 +01:00
16ea7f410e feat(galil-rio): Add di_out channels to GalilRIO 2026-02-16 14:57:02 +01:00
58 changed files with 3400 additions and 2216 deletions

View File

@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.2.2
_commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: csaxs_bec

View File

@@ -28,7 +28,7 @@ on:
description: "Python version to use"
required: false
type: string
default: "3.11"
default: "3.12"
permissions:
pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
@@ -55,13 +67,6 @@ jobs:
- name: Checkout BEC Widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Install dependencies
shell: bash
run: |

View File

@@ -0,0 +1,62 @@
name: Create template upgrade PR for csaxs_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"

View File

@@ -1,20 +0,0 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "csaxs"
target: "csaxs_bec"
branch: $CHILD_PIPELINE_BRANCH
pages:
stage: Deploy
needs: []
variables:
TARGET_BRANCH: $CI_COMMIT_REF_NAME
rules:
- if: "$CI_COMMIT_TAG != null"
variables:
TARGET_BRANCH: $CI_COMMIT_TAG
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/csaxs_bec"'
script:
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/

View File

@@ -1,2 +1,6 @@
from .load_additional_correction import lamni_read_additional_correction
from .x_ray_eye_align import DataDrivenLamNI, LamNI, MagLamNI, XrayEyeAlign
from .alignment import XrayEyeAlign
from .lamni import LamNI
from .lamni_optics_mixin import LamNIInitError, LaMNIInitStages, LamNIOpticsMixin
__all__ = [
"LamNI", "XrayEyeAlign", "LamNIInitError", "LaMNIInitStages", "LamNIOpticsMixin"
]

View File

@@ -0,0 +1,461 @@
import builtins
import time
from collections import defaultdict
import numpy as np
from bec_lib import bec_logger
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class XrayEyeAlign:
# pixel calibration, multiply to get mm
# PIXEL_CALIBRATION = 0.2/209 #.2 with binning
PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning
def __init__(self, client, lamni) -> None:
self.client = client
self.lamni = lamni
self.device_manager = client.device_manager
self.scans = client.scans
self.alignment_values = defaultdict(list)
self._reset_init_values()
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
# ------------------------------------------------------------------
# Correction reset
# ------------------------------------------------------------------
def reset_correction(self):
self.corr_pos_x = []
self.corr_pos_y = []
self.corr_angle = []
def reset_correction_2(self):
self.corr_pos_x_2 = []
self.corr_pos_y_2 = []
self.corr_angle_2 = []
def reset_xray_eye_correction(self):
self.client.delete_global_var("tomo_fit_xray_eye")
# ------------------------------------------------------------------
# FOV offset properties
# ------------------------------------------------------------------
@property
def tomo_fovx_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[0] / 1000
@tomo_fovx_offset.setter
@typechecked
def tomo_fovx_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]])
@property
def tomo_fovy_offset(self):
val = self.client.get_global_var("tomo_fov_offset")
if val is None:
return 0.0
return val[1] / 1000
@tomo_fovy_offset.setter
@typechecked
def tomo_fovy_offset(self, val: float):
val_old = self.client.get_global_var("tomo_fov_offset")
if val_old is None:
val_old = [0.0, 0.0]
self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000])
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _reset_init_values(self):
self.shift_xy = [0, 0]
self._xray_fov_xy = [0, 0]
def _disable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_disable()
def _enable_rt_feedback(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable
umv(self.device_manager.devices.lsamrot, val)
def get_tomo_angle(self):
return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"]
# ------------------------------------------------------------------
# X-ray eye camera control
# ------------------------------------------------------------------
def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self):
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
# start live
epics_put("XOMNYI-XEYE-ACQ:0", 1)
# wait for start live
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
time.sleep(0.5)
print("waiting for live view to start...")
fshopen()
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...")
time.sleep(0.5)
time.sleep(0.5)
# stop live view
epics_put("XOMNYI-XEYE-ACQ:0", 0)
time.sleep(1)
print("got new frame")
def update_fov(self, k: int):
self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0])
self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0])
@property
def movement_buttons_enabled(self):
return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")]
@movement_buttons_enabled.setter
def movement_buttons_enabled(self, enabled: bool):
enabled = int(enabled)
epics_put("XOMNYI-XEYE-ENAMVX:0", enabled)
epics_put("XOMNYI-XEYE-ENAMVY:0", enabled)
def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
# ------------------------------------------------------------------
# Alignment procedure
# ------------------------------------------------------------------
def align(self):
self._reset_init_values()
self.reset_correction()
self.reset_correction_2()
self._disable_rt_feedback()
epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION)
self._enable_rt_feedback()
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-ACQ:0", 0)
self.send_message("please wait...")
epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...")
self._disable_rt_feedback()
k = 0
self.lamni.lfzp_in()
self.update_frame()
self.movement_buttons_enabled = False
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-STEP:0", 0)
self.send_message("Submit center value of FZP.")
while True:
if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1:
val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm
val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm
self.alignment_values[k] = [val_x, val_y]
print(
f"Clicked position {k}: x {self.alignment_values[k][0]}, y"
f" {self.alignment_values[k][1]}"
)
if k == 0: # received center value of FZP
self.send_message("please wait ...")
self.lamni.loptics_out()
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
print("Moving sample in, FZP out")
self._disable_rt_feedback()
time.sleep(0.3)
self._enable_rt_feedback()
time.sleep(0.3)
self.update_frame()
self.send_message("Go and find the sample")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True
elif k == 1: # received sample center value at samrot 0
msg = (
f"Base shift values from movement are x {self.shift_xy[0]}, y"
f" {self.shift_xy[1]}"
)
print(msg)
logger.info(msg)
self.shift_xy[0] += (
self.alignment_values[0][0] - self.alignment_values[1][0]
) * 1000
self.shift_xy[1] += (
self.alignment_values[1][1] - self.alignment_values[0][1]
) * 1000
print(
"Base shift values from movement and clicked position are x"
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
time.sleep(1)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center and FOV (0 deg)")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.update_fov(k)
elif 1 < k < 10: # received sample center value at samrot 0 ... 315
self.send_message("please wait ...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45 - 45 / 2)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
self._disable_rt_feedback()
self.tomo_rotate((k - 1) * 45)
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame()
self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
self.update_fov(k)
elif k == 10: # received sample center value at samrot 270, done
self.send_message("done...")
epics_put("XOMNYI-XEYE-SUBMIT:0", -1)
self.movement_buttons_enabled = False
self.update_fov(k)
break
k += 1
epics_put("XOMNYI-XEYE-STEP:0", k)
if k < 2:
_xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0")
_xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0")
if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0:
self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx
self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy
self.scans.lamni_move_to_scan_center(
self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle()
).wait()
print(
f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}"
)
epics_put("XOMNYI-XEYE-MVY:0", 0)
epics_put("XOMNYI-XEYE-MVX:0", 0)
self.update_frame()
time.sleep(0.2)
self.write_output()
fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2
fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2
print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns,"
f" fovy = {fovy:.0f} microns"
)
print("Use matlab routine to fit the current alignment...")
print(
"This additional shift is applied to the base shift values\n which are x"
f" {self.shift_xy[0]}, y {self.shift_xy[1]}"
)
self._disable_rt_feedback()
self.tomo_rotate(0)
print(
"\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running"
" lamni.align.read_xray_eye_correction()\n"
)
self.client.set_global_var("tomo_fov_offset", self.shift_xy)
# ------------------------------------------------------------------
# Alignment output
# ------------------------------------------------------------------
def write_output(self):
import os
with open(
os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w"
) as alignment_values_file:
alignment_values_file.write("angle\thorizontal\tvertical\n")
for k in range(2, 11):
fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000
fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000
print(
f"Writing to file new alignment: number {k}, value x {fovx_offset}, y"
f" {fovy_offset}"
)
alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n")
# ------------------------------------------------------------------
# X-ray eye sinusoidal correction (loaded from MATLAB fit files)
# ------------------------------------------------------------------
def read_xray_eye_correction(self, dir_path=None):
import os
if dir_path is None:
dir_path = os.path.expanduser("~/Data10/specES1/internal/")
tomo_fit_xray_eye = np.zeros((2, 3))
for i, axis in enumerate(["x", "y"]):
for j, coeff in enumerate(["A", "B", "C"]):
with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f:
tomo_fit_xray_eye[i][j] = f.readline()
self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist())
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
print("New alignment parameters loaded from X-ray eye")
print(
f"X Amplitude {tomo_fit_xray_eye[0][0]}, "
f"X Phase {tomo_fit_xray_eye[0][1]}, "
f"X Offset {tomo_fit_xray_eye[0][2]}, "
f"Y Amplitude {tomo_fit_xray_eye[1][0]}, "
f"Y Phase {tomo_fit_xray_eye[1][1]}, "
f"Y Offset {tomo_fit_xray_eye[1][2]}"
)
def lamni_compute_additional_correction_xeye_mu(self, angle):
"""Compute sinusoidal correction from the X-ray eye fit for the given angle."""
tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye")
if tomo_fit_xray_eye is None:
print("Not applying any additional correction. No x-ray eye data available.\n")
return (0, 0)
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
correction_x = (
tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1])
+ tomo_fit_xray_eye[0][2]
) / 1000
correction_y = (
tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1])
+ tomo_fit_xray_eye[1][2]
) / 1000
print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n")
return (correction_x, correction_y)
# ------------------------------------------------------------------
# Additional lookup-table corrections (iteration 1 and 2)
# ------------------------------------------------------------------
def read_additional_correction(self, correction_file: str):
self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy(
correction_file
)
def read_additional_correction_2(self, correction_file: str):
self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy(
correction_file
)
def _read_correction_file_xy(self, correction_file: str):
"""Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries."""
with open(correction_file, "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(float(value) / 1000)
elif name == "corr_pos_y":
corr_pos_y.append(float(value) / 1000)
elif name == "corr_angle":
corr_angle.append(float(value))
return corr_pos_x, corr_pos_y, corr_angle
def compute_additional_correction(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1"
)
def compute_additional_correction_2(self, angle):
return self._compute_correction_xy(
angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2"
)
def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""):
"""Find the correction for the closest angle in the lookup table."""
if not corr_pos_x:
print(f"Not applying additional correction {label}. No data available.\n")
return (0, 0)
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
angledelta = np.fabs(corr_angle[0] - angle)
for j in range(1, len(corr_pos_x)):
newangledelta = np.fabs(corr_angle[j] - angle)
if newangledelta < angledelta:
shift_x = corr_pos_x[j]
shift_y = corr_pos_y[j]
angledelta = newangledelta
if shift_x == 0 and angle < corr_angle[0]:
shift_x = corr_pos_x[0]
shift_y = corr_pos_y[0]
if shift_x == 0 and angle > corr_angle[-1]:
shift_x = corr_pos_x[-1]
shift_y = corr_pos_y[-1]
print(f"Additional correction shifts {label}: {shift_x} {shift_y}")
return (shift_x, shift_y)

View File

@@ -0,0 +1,211 @@
"""
extra_tomo.py
=============
Specialist LamNI subclasses for specific experimental configurations.
Import explicitly when needed, e.g.:
from csaxs_bec...extra_tomo import MagLamNI
from csaxs_bec...extra_tomo import DataDrivenLamNI
"""
import builtins
import datetime
import os
import time
import h5py
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from .lamni import LamNI
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
scans = builtins.__dict__.get("scans")
class MagLamNI(LamNI):
"""LamNI subclass for magnetic experiments (XMCD).
Adds a slow rotation helper and allows injection of a custom
per-angle callback via the ``lamni_at_each_angle`` builtin.
"""
def sub_tomo_scan(self, subtomo_number, start_angle=None):
super().sub_tomo_scan(subtomo_number, start_angle)
# self.rotate_slowly(0)
def rotate_slowly(self, angle, step_size=20):
"""Rotate to target angle in small steps to avoid mechanical stress."""
current_angle = dev.lsamrot.read(cached=True)["value"]
steps = int(np.ceil(np.abs(current_angle - angle) / step_size)) + 1
for target_angle in np.linspace(current_angle, angle, steps, endpoint=True):
umv(dev.lsamrot, target_angle)
scans.lamni_move_to_scan_center(
self.align.tomo_fovx_offset, self.align.tomo_fovy_offset, target_angle
)
def _at_each_angle(self, angle: float) -> None:
if "lamni_at_each_angle" in builtins.__dict__:
# pylint: disable=undefined-variable
lamni_at_each_angle(self, angle)
return
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
class DataDrivenLamNI(LamNI):
"""LamNI subclass that reads per-projection scan parameters from an HDF5 file.
Instead of a fixed FOV and step size for the whole tomogram, each
projection can have individual values for step size, loptz position
and lateral shifts, as specified in a datadriven_params.h5 file.
"""
def __init__(self, client):
super().__init__(client)
self.tomo_data = {}
def tomo_scan(
self,
subtomo_start=1,
start_index=None,
fname="~/Data10/data_driven_config/datadriven_params.h5",
):
"""Start a data-driven tomo scan.
Args:
subtomo_start (int): Unused; kept for API compatibility. Use start_index to resume.
start_index (int, optional): Skip projections before this index. Defaults to None.
fname (str): Path to the HDF5 parameter file. Defaults to the standard location.
"""
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
fname = os.path.expanduser(fname)
if not os.path.exists(fname):
raise FileNotFoundError(f"Could not find datadriven params file in {fname}.")
content = f"Loading tomo parameters from {fname}."
logger.warning(content)
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(["Data_driven_file", "BEC"])
self.client.logbook.send_logbook_message(msg)
self._update_tomo_data_from_file(fname)
self._current_special_angles = self.special_angles.copy()
if subtomo_start == 1 and start_index is None:
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
with scans.dataset_id_on_hold:
self.sub_tomo_data_driven(start_index)
def sub_tomo_scan(self, subtomo_number=None, start_angle=None):
raise NotImplementedError(
"Cannot run sub_tomo_scan with DataDrivenLamNI. "
"Use lamni.tomo_scan(start_index=<N>) to resume instead."
)
def _at_each_angle(
self, angle=None, stepsize=None, loptz_pos=None, manual_shift_x=0, manual_shift_y=0
):
self.manual_shift_x = manual_shift_x
self.manual_shift_y = manual_shift_y
self.tomo_shellstep = stepsize
if loptz_pos is not None:
dev.rtx.controller.feedback_disable()
umv(dev.loptz, loptz_pos)
super()._at_each_angle(angle=angle)
def sub_tomo_data_driven(self, start_index=None):
"""Iterate over all projections defined in the loaded HDF5 parameter file."""
for scan_index, scan_data in enumerate(zip(*self.tomo_data.values())):
if start_index and scan_index < start_index:
continue
(
angle,
stepsize,
loptz_pos,
propagation_distance,
manual_shift_x,
manual_shift_y,
subtomo_number,
) = scan_data
bec.metadata.update(
{key: float(val) for key, val in zip(self.tomo_data.keys(), scan_data)}
)
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle}")
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(
float(angle),
stepsize=float(stepsize),
loptz_pos=float(loptz_pos),
manual_shift_x=float(manual_shift_x),
manual_shift_y=float(manual_shift_y),
)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
def _update_tomo_data_from_file(self, fname: str) -> None:
"""Load projection parameters from the HDF5 file into self.tomo_data."""
with h5py.File(fname, "r") as file:
self.tomo_data["theta"] = np.array([*file["theta"]]).flatten()
self.tomo_data["stepsize"] = np.array([*file["stepsize"]]).flatten()
self.tomo_data["loptz"] = np.array([*file["loptz"]]).flatten()
self.tomo_data["propagation_distance"] = np.array(
[*file["relative_propagation_distance"]]
).flatten()
self.tomo_data["manual_shift_x"] = np.array([*file["manual_shift_x"]]).flatten()
self.tomo_data["manual_shift_y"] = np.array([*file["manual_shift_y"]]).flatten()
self.tomo_data["subtomo_id"] = np.array([*file["subtomo_id"]]).flatten()
shapes = [data.shape for data in self.tomo_data.values()]
if len(set(shapes)) > 1:
raise ValueError(f"Tomo data file has entries of inconsistent lengths: {shapes}.")

File diff suppressed because it is too large Load Diff

View File

@@ -6,37 +6,44 @@ from rich.console import Console
from rich.table import Table
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
# import builtins to avoid linter errors
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class LamNIInitError(Exception):
pass
class LaMNIInitStagesMixin:
class LaMNIInitStages:
"""Handles hardware initialization and referencing of LamNI stages."""
def __init__(self, client):
super().__init__()
self.client = client
self.OMNYTools = OMNYTools(self.client)
def lamni_init_stages(self):
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
if user_input == "y":
print("staring...")
if self.OMNYTools.yesno("Start initialization of LamNI stages. OK?"):
print("starting...")
dev.lsamrot.enabled = True
else:
return
if self.check_all_axes_of_lamni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
else:
return
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
user_input = input("The rotation stage will be moved to one limit [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("The rotation stage will be moved to one limit"):
print("starting...")
else:
return
@@ -47,10 +54,9 @@ class LaMNIInitStagesMixin:
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
user_input = input(
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
)
if user_input == "y":
if self.OMNYTools.yesno(
"Init of loptz. Can the stage move to the upstream limit without collision?"
):
print("ok then...")
else:
return
@@ -75,14 +81,14 @@ class LaMNIInitStagesMixin:
self.drive_axis_to_limit(dev.lsamy, "reverse")
self.find_reference_mark(dev.lsamy)
# the dual encoder requires the reference mark to pass on both encoders
print("Referencing lsamrot")
self.drive_axis_to_limit(dev.lsamrot, "reverse")
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno(
"Init of leye. Can the stage move to -x limit without collision?"
):
print("starting...")
else:
return
@@ -92,15 +98,6 @@ class LaMNIInitStagesMixin:
print("Referencing leyey")
self.drive_axis_to_limit(dev.leyey, "forward")
# set_lm lsamx 6 14
# set_lm lsamy 6 14
# set_lm lsamrot -3 362
# set_lm loptx -1 -0.2
# set_lm lopty 3.0 3.6
# set_lm loptz 82 87
# set_lm leyex 0 25
# set_lm leyey 0.5 50
print("Init of Smaract stages")
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
time.sleep(1)
@@ -108,15 +105,6 @@ class LaMNIInitStagesMixin:
time.sleep(1)
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(1)
# dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
# time.sleep(1)
# dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
# time.sleep(1)
# set_lm losax -1.5 0.25
# set_lm losay -2.5 4.1
# set_lm losaz -4.1 -0.5
# set_lm lcsy -1.5 5
self._align_setup()
@@ -134,8 +122,7 @@ class LaMNIInitStagesMixin:
return ord(axis_id.lower()) - 97
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
if self.OMNYTools.yesno("Start moving stages to default initial positions?"):
print("Start moving stages...")
else:
print("Stopping.")
@@ -174,6 +161,8 @@ class LaMNIInitStagesMixin:
class LamNIOpticsMixin:
"""Optics movement methods: FZP, OSA, central stop and X-ray eye."""
@staticmethod
def _get_user_param_safe(device, var):
param = dev[device].user_parameter
@@ -188,13 +177,11 @@ class LamNIOpticsMixin:
umv(dev.leyey, leyey_out)
epics_put("XOMNYI-XEYE-ACQ:0", 2)
# move rotation stage to zero to avoid problems with wires
umv(dev.lsamrot, 0)
umv(dev.dttrz, 5854, dev.fttrz, 2395)
def leye_in(self):
bec.queue.next_dataset_number += 1
# move rotation stage to zero to avoid problems with wires
umv(dev.lsamrot, 0)
umv(dev.dttrz, 6419.677, dev.fttrz, 2959.979)
while True:
@@ -211,15 +198,10 @@ class LamNIOpticsMixin:
def _lfzp_in(self):
loptx_in = self._get_user_param_safe("loptx", "in")
lopty_in = self._get_user_param_safe("lopty", "in")
umv(
dev.loptx, loptx_in, dev.lopty, lopty_in
) # for 7.2567 keV and 150 mu, 60 nm fzp, loptz 83.6000 for propagation 1.4 mm
umv(dev.loptx, loptx_in, dev.lopty, lopty_in)
def lfzp_in(self):
"""
move in the lamni zone plate.
This will disable rt feedback, move the FZP and re-enabled the feedback.
"""
"""Move in the LamNI zone plate, disabling/re-enabling RT feedback around the move."""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
@@ -229,18 +211,15 @@ class LamNIOpticsMixin:
dev.rtx.controller.feedback_enable_with_reset()
def loptics_in(self):
"""
Move in the lamni optics, including the FZP and the OSA.
"""
"""Move in the LamNI optics (FZP + OSA)."""
self.lfzp_in()
self.losa_in()
def loptics_out(self):
"""Move out the lamni optics"""
"""Move out the LamNI optics."""
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
# self.lcs_out()
self.losa_out()
loptx_out = self._get_user_param_safe("loptx", "out")
lopty_out = self._get_user_param_safe("lopty", "out")
@@ -251,28 +230,17 @@ class LamNIOpticsMixin:
dev.rtx.controller.feedback_enable_with_reset()
def lcs_in(self):
# umv lcsx -1.852 lcsy -0.095
pass
def lcs_out(self):
umv(dev.lcsy, 3)
def losa_in(self):
# 6.2 keV, 170 um FZP
# umv(dev.losax, -1.4450000, dev.losay, -0.1800)
# umv(dev.losaz, -1)
# 6.7, 170
# umv(dev.losax, -1.4850, dev.losay, -0.1930)
# umv(dev.losaz, 1.0000)
# 7.2, 150
losax_in = self._get_user_param_safe("losax", "in")
losay_in = self._get_user_param_safe("losay", "in")
losaz_in = self._get_user_param_safe("losaz", "in")
umv(dev.losax, losax_in, dev.losay, losay_in)
umv(dev.losaz, losaz_in)
# 11 kev
# umv(dev.losax, -1.161000, dev.losay, -0.196)
# umv(dev.losaz, 1.0000)
def losa_out(self):
losay_out = self._get_user_param_safe("losay", "out")
@@ -281,11 +249,10 @@ class LamNIOpticsMixin:
umv(dev.losay, losay_out)
def lfzp_info(self, mokev_val=-1):
if mokev_val == -1:
try:
mokev_val = dev.mokev.readback.get()
except:
except Exception:
print(
"Device mokev does not exist. You can specify the energy in keV as an argument instead."
)
@@ -320,10 +287,6 @@ class LamNIOpticsMixin:
)
console.print(table)
print("OSA Information:")
# print(f"Current losaz %.1f\n", A[losaz])
# print("The OSA will collide with the sample plane at %.1f\n\n", 89.3-A[loptz])
print(
"The numbers presented here are for a sample in the plane of the lamni sample holder.\n"
)

View File

@@ -1,22 +0,0 @@
def lamni_read_additional_correction():
# "additional_correction_shift"
# [0][] x , [1][] y, [2][] angle, [3][0] number of elements
with open("correction_lamni_um_S01405_.txt", "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
print(int_num_elements)
corr_pos_x = []
corr_pos_y = []
corr_angle = []
for j in range(0, int_num_elements * 3):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos_x":
corr_pos_x.append(value)
elif name == "corr_pos_y":
corr_pos_y.append(value)
elif name == "corr_angle":
corr_angle.append(value)
return (corr_pos_x, corr_pos_y, corr_angle, num_elements)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,442 @@
"""
csaxs_dlpca200.py
=================
BEC control script for FEMTO DLPCA-200 Variable Gain Low Noise Current Amplifiers
connected to Galil RIO digital outputs.
DLPCA-200 Remote Control (datasheet page 4)
-------------------------------------------
Sub-D pin -> function:
Pin 10 -> gain LSB (digital out channel, index 0 in bit-tuple)
Pin 11 -> gain MID (digital out channel, index 1 in bit-tuple)
Pin 12 -> gain MSB (digital out channel, index 2 in bit-tuple)
Pin 13 -> coupling LOW = AC, HIGH = DC
Pin 14 -> speed mode HIGH = low noise (Pin14=1), LOW = high speed (Pin14=0)
Gain truth table (MSB, MID, LSB):
0,0,0 -> low-noise: 1e3 high-speed: 1e5
0,0,1 -> low-noise: 1e4 high-speed: 1e6
0,1,0 -> low-noise: 1e5 high-speed: 1e7
0,1,1 -> low-noise: 1e6 high-speed: 1e8
1,0,0 -> low-noise: 1e7 high-speed: 1e9
1,0,1 -> low-noise: 1e8 high-speed: 1e10
1,1,0 -> low-noise: 1e9 high-speed: 1e11
Strategy: prefer low-noise mode (1e3-1e9). For 1e10 and 1e11,
automatically fall back to high-speed mode.
Device wiring example (galilrioesxbox):
bpm4: Pin10->ch0, Pin11->ch1, Pin12->ch2, Pin13->ch3, Pin14->ch4
bim: Pin10->ch6, Pin11->ch7, Pin12->ch8, Pin13->ch9, Pin14->ch10
Usage examples
--------------
csaxs_amp = cSAXSDLPCA200(client)
csaxs_amp.set_gain("bpm4", 1e7) # low-noise if possible
csaxs_amp.set_gain("bim", 1e10) # auto high-speed
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
csaxs_amp.info("bpm4") # print current settings
csaxs_amp.info_all() # print all configured amplifiers
"""
import builtins
from bec_lib import bec_logger
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
# ---------------------------------------------------------------------------
# Amplifier registry
# ---------------------------------------------------------------------------
# Each entry describes one DLPCA-200 amplifier connected to a Galil RIO.
#
# Keys inside "channels":
# gain_lsb -> digital output channel number wired to DLPCA-200 Pin 10
# gain_mid -> digital output channel number wired to DLPCA-200 Pin 11
# gain_msb -> digital output channel number wired to DLPCA-200 Pin 12
# coupling -> digital output channel number wired to DLPCA-200 Pin 13
# speed_mode -> digital output channel number wired to DLPCA-200 Pin 14
#
# To add a new amplifier, simply extend this dict.
# ---------------------------------------------------------------------------
DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"bpm4": {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3
"speed_mode": 4, # Pin 14 -> Galil ch4
},
},
"bim": {
"rio_device": "galilrioesxbox",
"description": "Beam Intensity Monitor current amplifier",
"channels": {
"gain_lsb": 6, # Pin 10 -> Galil ch6
"gain_mid": 7, # Pin 11 -> Galil ch7
"gain_msb": 8, # Pin 12 -> Galil ch8
"coupling": 9, # Pin 13 -> Galil ch9
"speed_mode": 10, # Pin 14 -> Galil ch10
},
},
}
# ---------------------------------------------------------------------------
# DLPCA-200 gain encoding tables
# ---------------------------------------------------------------------------
# (msb, mid, lsb) -> gain in V/A
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
# Inverse maps: gain -> (msb, mid, lsb, low_noise_flag)
# low_noise_flag: True = Pin14 HIGH, False = Pin14 LOW
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class cSAXSDLPCA200Error(Exception):
pass
class cSAXSDLPCA200:
"""
Control class for FEMTO DLPCA-200 current amplifiers connected via Galil RIO
digital outputs in a BEC environment.
Supports:
- Forward control: set_gain(), set_coupling()
- Readback reporting: info(), info_all(), read_settings()
- Robust error handling and logging following cSAXS conventions.
"""
TAG = "[DLPCA200]"
def __init__(self, client, config: dict | None = None) -> None:
"""
Parameters
----------
client : BEC client object (passed through for future use)
config : optional override for DLPCA200_AMPLIFIER_CONFIG.
Falls back to the module-level dict if not provided.
"""
self.client = client
self._config: dict[str, dict] = config if config is not None else DLPCA200_AMPLIFIER_CONFIG
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _require_dev(self) -> None:
if dev is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} BEC 'dev' namespace is not available in this session."
)
def _get_cfg(self, amp_name: str) -> dict:
"""Return config dict for a named amplifier, raising on unknown names."""
if amp_name not in self._config:
known = ", ".join(sorted(self._config.keys()))
raise cSAXSDLPCA200Error(f"{self.TAG} Unknown amplifier '{amp_name}'. Known: [{known}]")
return self._config[amp_name]
def _get_rio(self, amp_name: str):
"""Return the live RIO device object for a given amplifier."""
self._require_dev()
cfg = self._get_cfg(amp_name)
rio_name = cfg["rio_device"]
try:
rio = getattr(dev, rio_name)
except AttributeError:
raise cSAXSDLPCA200Error(f"{self.TAG} RIO device '{rio_name}' not found in BEC 'dev'.")
return rio
def _dout_get(self, rio, ch: int) -> int:
"""Read one digital output channel (returns 0 or 1)."""
attr = getattr(rio.digital_out, f"ch{ch}")
val = attr.get()
return int(val)
def _dout_set(self, rio, ch: int, value: bool) -> None:
"""Write one digital output channel (True=HIGH=1, False=LOW=0)."""
attr = getattr(rio.digital_out, f"ch{ch}")
attr.set(value)
def _read_gain_bits(self, amp_name: str) -> tuple[int, int, int, int]:
"""
Read current gain bit-state from hardware.
Returns
-------
(msb, mid, lsb, speed_mode)
speed_mode: 1 = low-noise (Pin14=HIGH), 0 = high-speed (Pin14=LOW)
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
return msb, mid, lsb, speed_mode
def _decode_gain(self, msb: int, mid: int, lsb: int, speed_mode: int) -> int | None:
"""
Decode hardware bit-state into gain value (V/A).
speed_mode=1 -> low-noise table, speed_mode=0 -> high-speed table.
Returns None if the bit combination is not in the table.
"""
bits = (msb, mid, lsb)
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
# ------------------------------------------------------------------
# Public API - control
# ------------------------------------------------------------------
def set_gain(self, amp_name: str, gain: float, force_high_speed: bool = False) -> None:
"""
Set the transimpedance gain of a DLPCA-200 amplifier.
The method automatically selects low-noise mode (Pin14=HIGH) whenever
the requested gain is achievable in low-noise mode (1e3 - 1e9 V/A).
For gains of 1e10 and 1e11 V/A, high-speed mode is used automatically.
Parameters
----------
amp_name : str
Amplifier name as defined in DLPCA200_AMPLIFIER_CONFIG (e.g. "bpm4").
gain : float or int
Target gain in V/A. Must be one of:
1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11.
force_high_speed : bool, optional
If True, force high-speed (low-noise=False) mode even for gains
below 1e10. Default: False (prefer low-noise).
Examples
--------
csaxs_amp.set_gain("bpm4", 1e7) # low-noise mode (automatic)
csaxs_amp.set_gain("bim", 1e10) # high-speed mode (automatic)
csaxs_amp.set_gain("bpm4", 1e7, force_high_speed=True) # override to high-speed
"""
gain_int = int(gain)
if gain_int not in _GAIN_TO_BITS:
valid_str = ", ".join(
f"1e{int(round(__import__('math').log10(g)))}" for g in VALID_GAINS
)
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid gain {gain:.2e} V/A for '{amp_name}'. "
f"Valid values: {valid_str}"
)
msb, mid, lsb, low_noise_preferred = _GAIN_TO_BITS[gain_int]
# Apply force_high_speed override
if force_high_speed and low_noise_preferred:
# Check if this gain is achievable in high-speed mode
hs_entry = next(
(bits for bits, g in _GAIN_BITS_HIGH_SPEED.items() if g == gain_int), None
)
if hs_entry is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} Gain {gain:.2e} V/A is not achievable in high-speed mode "
f"for '{amp_name}'."
)
msb, mid, lsb = hs_entry
low_noise_preferred = False
use_low_noise = low_noise_preferred and not force_high_speed
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["gain_msb"], bool(msb))
self._dout_set(rio, ch["gain_mid"], bool(mid))
self._dout_set(rio, ch["gain_lsb"], bool(lsb))
self._dout_set(rio, ch["speed_mode"], use_low_noise) # True=low-noise
mode_str = "low-noise" if use_low_noise else "high-speed"
logger.info(
f"{self.TAG} [{amp_name}] gain set to {gain_int:.2e} V/A "
f"({mode_str} mode, bits MSB={msb} MID={mid} LSB={lsb})"
)
print(
f"{amp_name}: gain -> {gain_int:.2e} V/A [{mode_str}] "
f"(bits: MSB={msb} MID={mid} LSB={lsb})"
)
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set gain on '{amp_name}': {exc}"
) from exc
def set_coupling(self, amp_name: str, coupling: str) -> None:
"""
Set AC or DC coupling on a DLPCA-200 amplifier.
Parameters
----------
amp_name : str
Amplifier name (e.g. "bpm4", "bim").
coupling : str
"AC" or "DC" (case-insensitive).
DC -> Pin13 HIGH, AC -> Pin13 LOW.
Examples
--------
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
"""
coupling_upper = coupling.strip().upper()
if coupling_upper not in ("AC", "DC"):
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid coupling '{coupling}' for '{amp_name}'. " f"Use 'AC' or 'DC'."
)
pin13_high = coupling_upper == "DC"
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["coupling"], pin13_high)
logger.info(f"{self.TAG} [{amp_name}] coupling set to {coupling_upper}")
print(f"{amp_name}: coupling -> {coupling_upper}")
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set coupling on '{amp_name}': {exc}"
) from exc
# ------------------------------------------------------------------
# Public API - readback / reporting
# ------------------------------------------------------------------
def read_settings(self, amp_name: str) -> dict:
"""
Read back the current settings from hardware digital outputs.
Returns
-------
dict with keys:
"amp_name" : str
"gain" : int or None - gain in V/A (None if unknown bit pattern)
"mode" : str - "low-noise" or "high-speed"
"coupling" : str - "AC" or "DC"
"bits" : dict - raw bit values {msb, mid, lsb, speed_mode, coupling}
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
coupling_bit = self._dout_get(rio, ch["coupling"])
gain = self._decode_gain(msb, mid, lsb, speed_mode)
mode = "low-noise" if speed_mode else "high-speed"
coupling = "DC" if coupling_bit else "AC"
return {
"amp_name": amp_name,
"gain": gain,
"mode": mode,
"coupling": coupling,
"bits": {
"msb": msb,
"mid": mid,
"lsb": lsb,
"speed_mode": speed_mode,
"coupling": coupling_bit,
},
}
def info(self, amp_name: str) -> None:
"""
Print a plain summary of the current settings for one amplifier.
Example output
--------------
Amplifier : bpm4
Description : Beam Position Monitor 4 current amplifier
RIO device : galilrioesxbox
Gain : 1.00e+07 V/A
Mode : low-noise
Coupling : DC
Raw bits : MSB=1 MID=0 LSB=0 speed=1 coup=1
"""
cfg = self._get_cfg(amp_name)
try:
s = self.read_settings(amp_name)
except Exception as exc:
print(f"{self.TAG} [{amp_name}] Could not read settings: {exc}")
return
gain_str = (
f"{s['gain']:.2e} V/A" if s["gain"] is not None else "UNKNOWN (invalid bit pattern)"
)
bits = s["bits"]
print(f" {'Amplifier':<12}: {amp_name}")
print(f" {'Description':<12}: {cfg.get('description', '')}")
print(f" {'RIO device':<12}: {cfg['rio_device']}")
print(f" {'Gain':<12}: {gain_str}")
print(f" {'Mode':<12}: {s['mode']}")
print(f" {'Coupling':<12}: {s['coupling']}")
print(
f" {'Raw bits':<12}: MSB={bits['msb']} MID={bits['mid']} LSB={bits['lsb']} speed={bits['speed_mode']} coup={bits['coupling']}"
)
def info_all(self) -> None:
"""
Print a plain summary for ALL configured amplifiers.
"""
print("\nDLPCA-200 Amplifier Status Report")
print("-" * 40)
for amp_name in sorted(self._config.keys()):
self.info(amp_name)
print()
def list_amplifiers(self) -> list[str]:
"""Return sorted list of configured amplifier names."""
return sorted(self._config.keys())

View File

@@ -41,8 +41,10 @@ import builtins
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSFilterTransmission:
"""

View File

@@ -8,11 +8,14 @@ from bec_lib import bec_logger
logger = bec_logger.logger
# Pull BEC globals if present
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSInitSmaractStagesError(Exception):
pass
@@ -383,7 +386,6 @@ class cSAXSInitSmaractStages:
if not self._yesno("Proceed with the motions listed above?", "y"):
logger.info("[cSAXS] Motion to initial position aborted by user.")
return
# --- Execution phase (SIMULTANEOUS MOTION) ---
if umv is None:
logger.error("[cSAXS] 'umv' is not available in this session.")

View File

@@ -22,8 +22,10 @@ logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class FlomniToolsError(Exception):
@@ -35,32 +37,32 @@ class FlomniInitError(Exception):
class FlomniError(Exception):
pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
# self.printgreen(message + " Automatically confirming default: yes")
# return True
# elif autoconfirm and default == "n":
# self.printgreen(message + " Automatically confirming default: no")
# return False
# if default == "y":
# message_ending = " [Y]/n? "
# elif default == "n":
# message_ending = " y/[N]? "
# else:
# message_ending = " y/n? "
# while True:
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
# if (
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
# ) or (default == "y" and user_input == ""):
# return True
# if (
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
# ) or (default == "n" and user_input == ""):
# return False
# else:
# print("Please expicitely confirm y or n.")

View File

@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class flomniGuiToolsError(Exception):

View File

@@ -13,8 +13,10 @@ logger = bec_logger.logger
# import builtins to avoid linter errors
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING:
from bec_ipython_client.plugins.flomni import Flomni

View File

@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYGuiToolsError(Exception):

View File

@@ -27,9 +27,10 @@ logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYInitError(Exception):
pass

View File

@@ -16,8 +16,10 @@ from rich.table import Table
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYToolsError(Exception):

View File

@@ -16,8 +16,10 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fsh
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYTransferError(Exception):

View File

@@ -13,8 +13,10 @@ logger = bec_logger.logger
# import builtins to avoid linter errors
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING:
from bec_ipython_client.plugins.omny import OMNY

View File

@@ -30,29 +30,74 @@ logger = bec_logger.logger
logger.info("Using the cSAXS startup script.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
elif _args.session.lower() == "csaxs":
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
logger.success("cSAXS session loaded.")
from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools
debug = DebugTools()
logger.success("Debug tools loaded. Use 'debug' to access them.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS.cSAXS import cSAXS
csaxs = cSAXS(bec)
logger.success("cSAXS session loaded.")
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C L a m N I
""")
elif _args.session.lower() == "omny":
from csaxs_bec.bec_ipython_client.plugins.flomni import OMNY
_session_name = "OMNY"
omny = OMNY(bec)
logger.success("OMNY session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗
██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝
██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝
██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝
██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝
B E C O M N Y
""")
elif _args.session.lower() == "flomni":
from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni
_session_name = "flomni"
flomni = Flomni(bec)
logger.success("flomni session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C f l O M N I
""")
# SETUP BEAMLINE INFO
from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo

View File

@@ -13,69 +13,10 @@ logger = bec_logger.logger
_Widgets = {
"OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye",
}
class OmnyAlignment(RPCBase):
@property
@rpc_call
def enable_live_view(self):
"""
None
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
None
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""
class XRayEye(RPCBase):
@rpc_call
def active_roi(self) -> "BaseROI | None":
@@ -83,20 +24,6 @@ class XRayEye(RPCBase):
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):

View File

@@ -1,140 +0,0 @@
from typing import TypedDict
from bec_widgets.utils.error_popups import SafeSlot
import os
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
from bec_qthemes import material_icon
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# class OmnyAlignmentUIComponents(TypedDict):
# moveRightButton: QPushButton
# moveLeftButton: QPushButton
# moveUpButton: QPushButton
# moveDownButton: QPushButton
# image: Image
class OmnyAlignment(BECWidget, QWidget):
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
ui_file = "./omny_alignment.ui"
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.ui.moveRightButton.setText("")
self.ui.moveRightButton.setIcon(
material_icon(icon_name="keyboard_arrow_right", **icon_options)
)
self.ui.moveLeftButton.setText("")
self.ui.moveLeftButton.setIcon(
material_icon(icon_name="keyboard_arrow_left", **icon_options)
)
self.ui.moveUpButton.setText("")
self.ui.moveUpButton.setIcon(
material_icon(icon_name="keyboard_arrow_up", **icon_options)
)
self.ui.moveDownButton.setText("")
self.ui.moveDownButton.setIcon(
material_icon(icon_name="keyboard_arrow_down", **icon_options)
)
self.ui.confirmButton.setText("OK")
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property
def enable_live_view(self):
return self.ui.liveViewSwitch.checked
@enable_live_view.setter
def enable_live_view(self, enable:bool):
self.ui.liveViewSwitch.checked = enable
@property
def user_message(self):
return self.ui.messageLineEdit.text()
@user_message.setter
def user_message(self, message:str):
self.ui.messageLineEdit.setText(message)
@property
def sample_name(self):
return self.ui.sampleLineEdit.text()
@sample_name.setter
def sample_name(self, message:str):
self.ui.sampleLineEdit.setText(message)
@SafeSlot(bool)
def on_live_view_enabled(self, enabled:bool):
from bec_widgets.widgets.plots.image.image import Image
logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image
if enabled:
image.image("cam_xeye")
return
image.disconnect_monitor("cam_xeye")
@property
def enable_move_buttons(self):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled:bool):
move_up:QPushButton = self.ui.moveUpButton
move_down:QPushButton = self.ui.moveDownButton
move_left:QPushButton = self.ui.moveLeftButton
move_right:QPushButton = self.ui.moveRightButton
move_up.setEnabled(enabled)
move_down.setEnabled(enabled)
move_left.setEnabled(enabled)
move_right.setEnabled(enabled)
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
widget = OmnyAlignment()
widget.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['omny_alignment.py']}

View File

@@ -1,125 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>988</width>
<height>821</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="2" column="2">
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="2">
<widget class="QPushButton" name="moveRightButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QPushButton" name="moveLeftButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QPushButton" name="moveUpButton">
<property name="text">
<string>Up</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="moveDownButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="confirmButton">
<property name="text">
<string>PushButton</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="2" column="0">
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="1">
<widget class="QLineEdit" name="sampleLineEdit"/>
</item>
<item row="1" column="1">
<widget class="QLineEdit" name="messageLineEdit"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>Sample</string>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Message</string>
</property>
</widget>
</item>
</layout>
</item>
<item row="1" column="0" colspan="3">
<widget class="Image" name="image">
<property name="enable_toolbar" stdset="0">
<bool>false</bool>
</property>
<property name="inner_axes" stdset="0">
<bool>false</bool>
</property>
<property name="monitor" stdset="0">
<string>cam_xeye</string>
</property>
<property name="rotation" stdset="0">
<number>3</number>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="ToggleSwitch" name="liveViewSwitch"/>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Live View</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>Image</class>
<extends>QWidget</extends>
<header>image</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,54 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
DOM_XML = """
<ui language='c++'>
<widget class='OmnyAlignment' name='omny_alignment'>
</widget>
</ui>
"""
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = OmnyAlignment(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(OmnyAlignment.ICON_NAME)
def includeFile(self):
return "omny_alignment"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "OmnyAlignment"
def toolTip(self):
return "OmnyAlignment"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -33,46 +33,48 @@ class XRayEye2DControl(BECWidget, QWidget):
self.get_bec_shortcuts()
self._step_size = step_size
self.root_layout = QGridLayout(self)
self.setStyleSheet("""
self.setStyleSheet(
"""
QToolButton {
border: 1px solid;
border-radius: 4px;
}
""")
"""
)
# Up
self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.move_up_button.setIcon(material_icon("keyboard_double_arrow_up"))
self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button
self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.move_up_tweak_button.setIcon(material_icon("keyboard_arrow_up"))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left
self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.move_left_button.setIcon(material_icon("keyboard_double_arrow_left"))
self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button
self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.move_left_tweak_button.setIcon(material_icon("keyboard_arrow_left"))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right
self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.move_right_button.setIcon(material_icon("keyboard_double_arrow_right"))
self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button
self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.move_right_tweak_button.setIcon(material_icon("keyboard_arrow_right"))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down
self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.move_down_button.setIcon(material_icon("keyboard_double_arrow_down"))
self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button
self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.move_down_tweak_button.setIcon(material_icon("keyboard_arrow_down"))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections
@@ -124,8 +126,15 @@ class XRayEye2DControl(BECWidget, QWidget):
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
USER_ACCESS = [
"active_roi",
"user_message",
"user_message.setter",
"sample_name",
"sample_name.setter",
"enable_move_buttons",
"enable_move_buttons.setter",
]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
@@ -136,7 +145,9 @@ class XRayEye(BECWidget, QWidget):
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.bec_dispatcher.connect_slot(
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
)
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
@@ -145,7 +156,9 @@ class XRayEye(BECWidget, QWidget):
self.core_layout = QHBoxLayout(self)
self.image = Image(parent=self)
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image.enable_toolbar = (
False # Disable default toolbar to not allow to user set anything
)
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
@@ -156,8 +169,9 @@ class XRayEye(BECWidget, QWidget):
self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
compact_orientation="horizontal")
self.roi_manager = ROIPropertyTree(
parent=self, image_widget=self.image, compact=True, compact_orientation="horizontal"
)
header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8)
@@ -230,7 +244,9 @@ class XRayEye(BECWidget, QWidget):
# Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
self.step_size.valueChanged.connect(
lambda x: self.motor_control_2d.setProperty("step_size", x)
)
self.submit_button.clicked.connect(self.submit)
def _create_separator(self):
@@ -248,12 +264,14 @@ class XRayEye(BECWidget, QWidget):
################################################################################
def connect_motors(self):
""" Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
"""Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ["osamroy", "lsamrot", "fsamroy"]
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
self.bec_dispatcher.connect_slot(
self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor)
)
logger.info(f"Succesfully connected to {motor}")
################################################################################
@@ -341,7 +359,7 @@ class XRayEye(BECWidget, QWidget):
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
# TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@@ -355,25 +373,25 @@ class XRayEye(BECWidget, QWidget):
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
signals = data.get("signals")
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get("value")
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get("value")
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get("value")
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
user_message = signals.get("omny_xray_gui_send_message").get("value")
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
sample_message = signals.get("omny_xray_gui_sample_name").get("value")
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get("value")
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
enable_submit_button = signals.get("omny_xray_gui_submit").get("value")
self.enable_submit_button(enable_submit_button)
@SafeSlot()
@@ -383,35 +401,40 @@ class XRayEye(BECWidget, QWidget):
logger.warning("No active ROI")
return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates['center_y']
roi_center_x = roi_coordinates["center_x"]
roi_center_y = roi_coordinates["center_y"]
# Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates['width']
roi_height = roi_coordinates['height']
roi_width = roi_coordinates["width"]
roi_height = roi_coordinates["height"]
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates['radius']
roi_width = roi_coordinates["diameter"]
roi_height = roi_coordinates["radius"]
else:
logger.warning("Unsupported ROI type for submit action.")
return
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
print(
f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}"
) # TODO remove when will be not needed for debugging
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get("value"))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
getattr(self.dev,CAMERA[0]).live_mode = False
self.bec_dispatcher.disconnect_slot(
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
)
getattr(self.dev, CAMERA[0]).live_mode = False
super().cleanup()
if __name__ == "__main__":
import sys

View File

@@ -9,27 +9,27 @@ eiger_1_5:
readoutPriority: async
softwareTrigger: False
eiger_9:
description: Eiger 9M detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
deviceConfig:
detector_distance: 100
beam_center: [0, 0]
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: False
# eiger_9:
# description: Eiger 9M detector
# deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
# deviceConfig:
# detector_distance: 100
# beam_center: [0, 0]
# onFailure: raise
# enabled: true
# readoutPriority: async
# softwareTrigger: False
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
deviceConfig:
camera_id: 201
bits_per_pixel: 24
m_n_colormode: 1
live_mode: True
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: True
# ids_cam:
# description: IDS camera for live image acquisition
# deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
# deviceConfig:
# camera_id: 201
# bits_per_pixel: 24
# m_n_colormode: 1
# live_mode: True
# onFailure: raise
# enabled: true
# readoutPriority: async
# softwareTrigger: True

View File

@@ -0,0 +1,25 @@
############################################################
##################### EPS ##################################
############################################################
x12saEPS:
description: X12SA EPS info and control
deviceClass: csaxs_bec.devices.epics.eps.EPS
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
##################### GalilRIO #############################
############################################################
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesxbox.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -1,11 +1,11 @@
# This is the main configuration file that is
# commented or uncommented according to the type of experiment
optics:
- !include ./bl_optics_hutch.yaml
# optics:
# - !include ./bl_optics_hutch.yaml
frontend:
- !include ./bl_frontend.yaml
# frontend:
# - !include ./bl_frontend.yaml
endstation:
- !include ./bl_endstation.yaml
@@ -16,8 +16,8 @@ detectors:
#sastt:
# - !include ./sastt.yaml
#flomni:
# - !include ./ptycho_flomni.yaml
flomni:
- !include ./ptycho_flomni.yaml
#omny:
# - !include ./ptycho_omny.yaml

View File

@@ -471,7 +471,7 @@ omnyfsh:
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui Epics signals
description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceConfig: {}
enabled: true
@@ -486,4 +486,25 @@ calculated_signal:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline
readoutPriority: baseline
############################################################
#################### OMNY Pandabox #########################
############################################################
omny_panda:
readoutPriority: async
deviceClass: csaxs_bec.devices.panda_box.panda_box_omny.PandaBoxOMNY
deviceConfig:
host: omny-panda.psi.ch
signal_alias:
FMC_IN.VAL1.Min: cap_voltage_fzp_y_min
FMC_IN.VAL1.Max: cap_voltage_fzp_y_max
FMC_IN.VAL1.Mean: cap_voltage_fzp_y_mean
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
deviceTags:
- detector
enabled: true
readOnly: false
softwareTrigger: false

View File

@@ -309,7 +309,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay)
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
self.set_delay_pairs(channel="gh", delay=self._shutter_to_open_delay, width=(shutter_width-self._shutter_to_open_delay))
self.set_delay_pairs(
channel="gh",
delay=self._shutter_to_open_delay,
width=(shutter_width - self._shutter_to_open_delay),
)
# Trigger extra pulse for MCS OR gate
# f = e + 1us
@@ -520,7 +524,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
- Return the status object to BEC which will automatically resolve once the status register has
the END_OF_BURST bit set. The callback of the status object will also stop the polling loop.
"""
overall_start = time.time()
self._stop_polling()
# NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source
@@ -559,7 +562,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Send trigger
self.trigger_shot.put(1, use_complete=True)
self.cancel_on_stop(status)
logger.info(f"Configured ddg in {time.time()-overall_start}")
return status
def on_stop(self) -> None:

View File

@@ -261,7 +261,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
"""
with self._rlock:
logger.info(f"Received update on mcs card {self.name}")
if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels
self._mca_counter_index += 1
@@ -293,9 +292,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
)
# Once we have received all channels, push data to BEC and reset for next accumulation
logger.info(
f"Received update for {attr_name}, index {self._mca_counter_index}/{self.NUM_MCA_CHANNELS}"
)
if len(self._current_data) == self.NUM_MCA_CHANNELS:
logger.debug(
f"Current data index {self._current_data_index} complete, pushing to BEC."
@@ -396,13 +392,15 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._current_data_index = 0
# NOTE Make sure that the signal that omits mca callbacks is cleared
# DO NOT REMOVE!!
self._omit_mca_callbacks.clear()
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
# For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly":
self.erase_start.put(1)
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_prescan(self) -> None | StatusBase:
"""
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure
@@ -446,7 +444,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
while not self._scan_done_thread_kill_event.is_set():
while self._start_monitor_async_data_emission.wait():
try:
logger.debug(f"Monitoring async data emission for {self.name}...")
if (
hasattr(self.scan_info.msg, "num_points")
and self.scan_info.msg.num_points is not None
@@ -456,7 +453,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
for callback in self._scan_done_callbacks:
callback(exception=None)
else:
logger.info(f"Current data index is {self._current_data_index}")
if self._current_data_index >= 1:
for callback in self._scan_done_callbacks:
callback(exception=None)
@@ -552,8 +548,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
def on_stop(self) -> None:
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
self.stop_all.put(1)
self.erase_all.put(1)
with suppress_mca_callbacks(self):
self.stop_all.put(1)
self.erase_all.put(1)
def mcs_recovery(self, timeout: int = 1) -> None:
"""

View File

@@ -132,7 +132,6 @@ class Eiger(PSIDeviceBase):
if data is None:
logger.error(f"Received image message on device {self.name} without data.")
return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data)
# pylint: disable=missing-function-docstring

View File

@@ -4,6 +4,7 @@ This module contains the base class for Galil controllers as well as the signals
import functools
import time
from typing import Any
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
@@ -347,7 +348,7 @@ class GalilSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.controller = self.root.controller if hasattr(self.root, "controller") else None
class GalilSignalRO(GalilSignalBase):

View File

@@ -6,24 +6,27 @@ Link to the Galil RIO vendor page:
https://www.galil.com/plcs/remote-io/rio-471xx
This module provides the GalilRIOController for communication with the RIO controller
over TCP/IP. It also provides a device integration that interfaces to these
8 analog channels.
over TCP/IP. It also provides a device integration that interfaces to its
8 analog channels, and 16 digital output channels. Some PLCs may have 24 digital output channels,
which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS variable.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DynamicDeviceComponent as DDC
from ophyd import Kind
from ophyd.utils import ReadOnlyError
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalRO,
GalilSignalBase,
retry_once,
)
@@ -35,15 +38,11 @@ logger = bec_logger.logger
class GalilRIOController(Controller):
"""
Controller Class for Galil RIO controller communication.
Multiple controllers are in use at the cSAXS beamline:
- 129.129.98.64 (port 23)
"""
"""Controller Class for Galil RIO controller communication."""
@threadlocked
def socket_put(self, val: str) -> None:
"""Socker put method."""
self.sock.put(f"{val}\r".encode())
@retry_once
@@ -64,21 +63,123 @@ class GalilRIOController(Controller):
)
class GalilRIOSignalRO(GalilSignalRO):
class GalilRIOAnalogSignalRO(GalilSignalBase):
"""
Read-only Signal for reading a single analog input channel from the Galil RIO controller.
It always read all 8 analog channels at once, and updates the reabacks of all channels.
New readbacks are only fetched from the controller if the last readback is older than
_READ_TIMEOUT seconds, otherwise the last cached readback is returned to reduce network traffic.
Signal for reading analog input channels of the Galil RIO controller. This signal is read-only, so
the set method raises a ReadOnlyError. The get method retrieves the values of all analog
channels in a single socket command. The readback values of all channels are updated based
on the response, and subscriptions are run for all channels. Readings are cached as implemented
in the SocketSignal class, so that multiple reads of the same channel within an update cycle do
not result in multiple socket calls.
Args:
signal_name (str): Name of the signal.
channel (int): Analog channel number (0-7).
parent (GalilRIO): Parent GalilRIO device.
signal_name (str): The name of the signal, e.g. "ch0", "ch1", ..., "ch7"
channel (int): The channel number corresponding to the signal, e.g. 0 for "ch0", 1 for "ch1", ...
parent (GalilRIO): The parent device instance that this signal belongs to.
"""
_NUM_ANALOG_CHANNELS = 8
_READ_TIMEOUT = 0.1 # seconds
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
def __init__(
self,
signal_name: str,
channel: int,
parent: GalilRIO,
readback_timeout: float = None,
**kwargs,
):
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
self._readback_timeout = (
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
)
self._metadata["write_access"] = False
self._last_readback = 0.0
def get(self):
current_time = time.monotonic()
if current_time - self._last_readback > self._readback_timeout:
old_value = self._readback
self._last_readback = current_time # _socket_get may rely on this value to be set.
self._readback = self._socket_get()
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_value,
value=self._readback,
timestamp=current_time,
)
return self._readback
def _socket_set(self, val):
"""Read-only signal, so set method raises an error."""
raise ReadOnlyError(f"Signal {self.name} is read-only.")
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ", @".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# Run updates for all channels. This also updates the _readback and metadata timestamp
# value of this channel.
self._update_all_channels(values)
return self._readback
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Method to receive a list of readback values for channels 0 to 7. Updates for each channel idx
are applied to the corresponding GalilRIOAnalogSignalRO signal with matching attr_name "ch{idx}".
We also update the _last_readback attribute of each of the signals, to avoid multiple socket calls,
but rather use the cached value of the combined reading for all channels.
Args:
values (list[float]): List of new readback values for all channels, where the
index corresponds to the channel number (0-7).
"""
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if isinstance(walk.item, GalilRIOAnalogSignalRO):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = self._last_readback
walk.item._last_readback = self._last_readback
walk.item._readback = new_val
if (
idx != self._channel
): # Only run subscriptions on other channels, not on itself
# as this is handled by the SocketSignal and we want to avoid running multiple
# subscriptions for the same channel update
updates[walk.item.attr_name] = (new_val, old_val)
else:
logger.warning(
f"Received {len(values)} values but found channel index {idx} in signal {walk.item.name}. Skipping update for this signal."
)
# Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update
# TODO for now skip running subscribers, this should be re-implemented
# once we properly handle subscriptions from bec running "read"
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=self._last_readback,
)
class GalilRIODigitalOutSignal(GalilSignalBase):
"""Signal for controlling digital outputs of the Galil RIO controller."""
_NUM_DIGITAL_OUTPUT_CHANNELS = 16
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
@@ -87,81 +188,78 @@ class GalilRIOSignalRO(GalilSignalRO):
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ",@".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
cmd = f"MG@OUT[{self._channel}]"
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# This updates all channels' readbacks, including self._readback
self._update_all_channels(values)
self._readback = float(ret.strip())
return self._readback
def get(self):
"""Get current analog channel values from the Galil RIO controller."""
# If the last readback has happend more than _READ_TIMEOUT seconds ago, read all channels again
if time.monotonic() - self.parent.last_readback > self._READ_TIMEOUT:
self._readback = self._socket_get()
return self._readback
def _socket_set(self, val: Literal[0, 1]) -> None:
"""Set command for the digital output signal. Value should be 0 or 1."""
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Update all analog channel readbacks based on the provided list of values.
List of values must be in order from an_ch0 to an_ch7.
if val not in (0, 1):
raise ValueError("Digital output value must be 0 or 1.")
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
self.controller.socket_put_confirmed(cmd)
We first have to update the _last_readback timestamp of the GalilRIO parent device.
Then we update all readbacks of all an_ch channels, before we run any subscriptions.
This ensures that all readbacks are updated before any subscriptions are run, which
may themselves read other channels.
Args:
values (list[float]): List of 8 float values corresponding to the analog channels.
They must be in order from an_ch0 to an_ch7.
"""
timestamp = time.time()
# Update parent's last readback before running subscriptions!!
self.parent._last_readback = time.monotonic()
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if walk.item.attr_name.startswith("an_ch"):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = timestamp
walk.item._readback = new_val
updates[walk.item.attr_name] = (new_val, old_val)
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of analog channel definitions for the DynamicDeviceComponent.
# Run subscriptions after all readbacks have been updated
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=timestamp,
)
Args:
num_channels (int): The number of analog channels to create.
"""
an_channels = {}
for i in range(0, num_channels):
an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO,
f"ch{i}",
{"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
)
return an_channels
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
Args:
num_channels (int): The number of digital output channels to create.
"""
di_out_channels = {}
for i in range(0, num_channels):
di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal,
f"ch{i}",
{"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
)
return di_out_channels
class GalilRIO(PSIDeviceBase):
"""
Galil RIO controller integration with 8 analog input channels. To implement the device,
please provide the appropriate host and port (default port is 23).
Galil RIO controller integration with 16 digital output channels and 8 analog input channels.
The default port for the controller is 23.
Args:
host (str): Hostname or IP address of the Galil RIO controller.
port (int, optional): Port number for the TCP/IP connection. Defaults to 23.
socket_cls (type[SocketIO], optional): Socket class to use for communication. Defaults to SocketIO.
scan_info (ScanInfo, optional): ScanInfo object for the device.
device_manager (DeviceManagerDS): The device manager instance that manages this device.
**kwargs: Additional keyword arguments passed to the PSIDeviceBase constructor.
"""
SUB_CONNECTION_CHANGE = "connection_change"
an_ch0 = Cpt(GalilRIOSignalRO, signal_name="an_ch0", channel=0, doc="Analog input channel 0")
an_ch1 = Cpt(GalilRIOSignalRO, signal_name="an_ch1", channel=1, doc="Analog input channel 1")
an_ch2 = Cpt(GalilRIOSignalRO, signal_name="an_ch2", channel=2, doc="Analog input channel 2")
an_ch3 = Cpt(GalilRIOSignalRO, signal_name="an_ch3", channel=3, doc="Analog input channel 3")
an_ch4 = Cpt(GalilRIOSignalRO, signal_name="an_ch4", channel=4, doc="Analog input channel 4")
an_ch5 = Cpt(GalilRIOSignalRO, signal_name="an_ch5", channel=5, doc="Analog input channel 5")
an_ch6 = Cpt(GalilRIOSignalRO, signal_name="an_ch6", channel=6, doc="Analog input channel 6")
an_ch7 = Cpt(GalilRIOSignalRO, signal_name="an_ch7", channel=7, doc="Analog input channel 7")
#############################
### Analog input channels ###
#############################
analog_in = DDC(_create_analog_channels(GalilRIOAnalogSignalRO._NUM_ANALOG_CHANNELS))
digital_out = DDC(
_create_digital_output_channels(GalilRIODigitalOutSignal._NUM_DIGITAL_OUTPUT_CHANNELS)
)
def __init__(
self,
@@ -177,19 +275,18 @@ class GalilRIO(PSIDeviceBase):
if port is None:
port = 23 # Default port for Galil RIO controller
self.controller = GalilRIOController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
name=f"GalilRIOController_{name}",
socket_cls=socket_cls,
socket_host=host,
socket_port=port,
device_manager=device_manager,
)
self._last_readback: float = time.monotonic()
self._readback_metadata: dict[str, float] = {"last_readback": 0.0}
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
@property
def last_readback(self) -> float:
"""Return the time of the last readback from the controller."""
return self._last_readback
# pylint: disable=arguments-differ
def wait_for_connection(self, timeout: float = 30.0) -> None:
"""Wait for the RIO controller to be connected within timeout period."""
@@ -207,7 +304,7 @@ class GalilRIO(PSIDeviceBase):
if __name__ == "__main__":
HOST_NAME = "129.129.98.64"
HOST_NAME = "129.129.122.14"
from bec_server.device_server.tests.utils import DMMock
dm = DMMock()

View File

@@ -25,6 +25,34 @@ logger = bec_logger.logger
class LamniGalilController(GalilController):
# ============================================================
# Error status
# ============================================================
caperr_bits = {
0x01: "Cap1 outside expected left-stop range (early check)",
0x02: "Cap2 outside expected left-stop range (early check)",
0x04: "Cap1 too low during pressure-off check (near right boundary)",
0x08: "Cap2 too low during pressure-off check (near right boundary)",
0x10: "Cap1 exceeded allowed left-stop boundary during movement",
0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)",
0x40: "Cap1 did not respond to test movement",
0x80: "Cap2 did not respond to test movement"
}
allaxrer_table = {
1: "Not all axes referenced after reference search",
2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)",
3: "Unexpected pressure OFF while soft-limits not yet set",
4: "Pressure valve mismatch (OUT13=0 but IN13=1)",
5: "Capacitive sensor boundary violations (caperr > 0)",
6: "Emergency Stop triggered (IN[5]=0)",
7: "Following error detected on one or more axes"
}
USER_ACCESS = [
"describe",
"show_running_threads",
@@ -37,6 +65,8 @@ class LamniGalilController(GalilController):
"get_motor_limit_switch",
"is_motor_on",
"all_axes_referenced",
"lamni_lights_off",
"lamni_lights_on"
]
def show_status_other(self):
@@ -60,6 +90,47 @@ class LamniGalilController(GalilController):
print("There is air pressure at the outer rotation radial.")
swver = float(self.socket_put_and_receive("MGswver"))
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
allaxref = int(float(self.socket_put_and_receive("MGallaxref")))
print(f"Error statuts:")
if allaxref == 1:
print(f"Allaxref = 1, all OK.")
else:
print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.")
allaxrer = int(float(self.socket_put_and_receive("MGallaxrer")))
print("\nallaxrer =", allaxrer)
print(self.decode_allaxrer(allaxrer))
caperr = int(float(self.socket_put_and_receive("MGcaperr")))
print("\nDecoding caperr =", caperr)
self.visualize_caperr(caperr)
def decode_allaxrer(self, code: int) -> str:
"""Return human-readable meaning of allaxrer code."""
return self.allaxrer_table.get(code, "Unknown allaxrer code")
def visualize_caperr(self, mask: int):
"""Pretty-print a bitmask visualization for caperr."""
print("\n=== CAPERR BITMASK VISUALIZER ===")
print(f"Raw value: {mask} (0x{mask:02X})")
print("----------------------------------\n")
print("Bit | Hex | Active | Meaning")
print("----------------------------------")
for bit, meaning in self.caperr_bits.items():
active = "YES" if mask & bit else "no"
print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}")
print("\nActive flags:")
active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit]
if active_flags:
for f in active_flags:
print("", f)
else:
print(" (none)")
print("\n==================================\n")
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
@@ -93,7 +164,7 @@ class LamniGalilReadbackSignal(GalilSignalRO):
val = super().read()
if self.parent.axis_Id_numeric == 2:
try:
rt = self.parent.device_manager.devices[self.parent.rtx]
rt = self.parent.device_manager.devices[self.parent.rt]
if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
except KeyError:
@@ -147,7 +218,7 @@ class LamniGalilMotor(Device, PositionerBase):
raise BECConfigError(
"device_mapping has been specified but the device_manager cannot be accessed."
)
self.rt = self.device_mapping.get("rt")
self.rt = self.device_mapping.get("rt", "rtx")
super().__init__(
prefix,

View File

@@ -11,6 +11,7 @@ from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError
@@ -51,6 +52,7 @@ class RtLamniController(Controller):
_axes_per_controller = 3
USER_ACCESS = [
"socket_put_and_receive",
"socket_put",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_without_reset",
@@ -62,6 +64,9 @@ class RtLamniController(Controller):
"_set_axis_velocity_maximum_speed",
"_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer",
"show_analog_signals",
"show_feedback_status",
]
def __init__(
@@ -208,8 +213,9 @@ class RtLamniController(Controller):
@threadlocked
def start_scan(self):
interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
if interferometer_feedback_not_running == 1:
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
# if interferometer_feedback_not_running == 1:
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an interferometer error."
)
@@ -270,6 +276,44 @@ class RtLamniController(Controller):
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
}
return signals
def feedback_is_running(self) -> bool:
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
return status == 0 # 0 means running, 1 means error/disabled
def show_feedback_status(self):
if self.feedback_is_running():
print("Loop is running, no error on interferometer.")
else:
print("Loop is not running, either it is turned off or an interferometer error occurred.")
def show_analog_signals(self) -> dict:
self.socket_put("As") # start sampling
time.sleep(0.01)
return_table = (self.socket_put_and_receive("Ar")).split(",")
number_of_samples = int(float(return_table[0]))
signals = {
"number_of_samples": number_of_samples,
"piezo_0": float(return_table[1]),
"piezo_1": float(return_table[2]),
"cap_0": float(return_table[3]),
"cap_1": float(return_table[4]),
"cap_2": float(return_table[5]),
"cap_3": float(return_table[6]),
"cap_4": float(return_table[7]),
}
t = PrettyTable()
t.title = f"LamNI Analog Signals ({number_of_samples} samples)"
t.field_names = ["Signal", "Value"]
for key, val in signals.items():
if key != "number_of_samples":
t.add_row([key, f"{val:.4f}"])
print(t)
return
def read_positions_from_sampler(self):
# this was for reading after the scan completed
@@ -347,6 +391,48 @@ class RtLamniController(Controller):
)
return bool(return_table[0])
def show_signal_strength_interferometer(self):
# trigger SSI averaging before reading
self.socket_put("J3")
time.sleep(0.05)
return_table = (self.socket_put_and_receive("J2")).split(",")
ssi_0 = float(return_table[1])
ssi_1 = float(return_table[2])
return_table_angle = (self.socket_put_and_receive("J7")).split(",")
angle_running = bool(int(float(return_table_angle[0])))
angle_position = float(return_table_angle[1])
angle_signal = float(return_table_angle[2])
t = PrettyTable()
t.title = "Interferometer signal strength"
t.field_names = ["Axis", "Description", "Value", "Running"]
t.add_row([0, "ST FZP horizontal", ssi_0, "-"])
t.add_row([1, "ST FZP vertical", ssi_1, "-"])
t.add_row([2, "Angle interferometer", angle_signal, angle_running])
print(t)
if angle_running:
print(f"Angle interferometer position: {angle_position:.4f} um")
else:
print("Warning: angle interferometer is not running.")
def show_interferometer_positions(self) -> dict:
return_table = (self.socket_put_and_receive("J4")).split(",")
loop_status = bool(int(float(return_table[0])))
pos_y = float(return_table[1])
pos_x = float(return_table[2])
t = PrettyTable()
t.title = "LamNI Interferometer Positions"
t.field_names = ["Axis", "Description", "Position (um)"]
t.add_row([0, "X", f"{pos_x:.4f}"])
t.add_row([1, "Y", f"{pos_y:.4f}"])
print(t)
print(f"Feedback loop running: {loop_status}")
return {"x": pos_x, "y": pos_y, "loop_running": loop_status}
def feedback_enable_with_reset(self):
if not self.feedback_status_angle_lamni():
self.feedback_disable_and_even_reset_lamni_angle_interferometer()

View File

@@ -0,0 +1,103 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxCSAXS(PandaBox):
"""
PandaBox integration for cSAXS. This class implements cSAXS specific logic for the PandaBox integration.
TODO: This logic is not yet mapped to any existing hardware. Adapt Docstring once the hardware is defined and integrated.
"""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxCSAXS(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -0,0 +1,99 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxOMNY(PandaBox):
"""PandaBox integration for OMNY. This class implements OMNY specific logic for the PandaBox integration."""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxOMNY(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -210,7 +210,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs):
"""
A LamNI scan following Fermat's spiral.
@@ -230,10 +230,10 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
Examples:
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, **kwargs)
super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs)
self.axis = []
scan_kwargs = parameter.get("kwargs", {})
self.fov_size = scan_kwargs.get("fov_size")
@@ -482,6 +482,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -52,6 +52,7 @@ class FlomniFermatScan(SyncFlyScanBase):
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
frames_per_trigger:int=1,
**kwargs,
):
"""
@@ -62,7 +63,8 @@ class FlomniFermatScan(SyncFlyScanBase):
fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um
cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time
exp_time(float) [s]: exposure time per burst frame
frames_per_trigger(int) : Number of burst frames per point
step(float) [um]: stepsize
zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first)
@@ -71,10 +73,10 @@ class FlomniFermatScan(SyncFlyScanBase):
Returns:
Examples:
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)
"""
super().__init__(parameter=parameter, exp_time=exp_time, **kwargs)
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.show_live_table = False
self.axis = []
self.fovx = fovx
@@ -323,6 +325,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stage()
yield from self.run_baseline_reading()
yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -1,4 +1,4 @@
""" Module with JungfrauJochTestScan class. """
"""Module with JungfrauJochTestScan class."""
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion

View File

@@ -51,6 +51,7 @@ class OMNYFermatScan(SyncFlyScanBase):
angle: float = None,
corridor_size: float = 3,
parameter: dict = None,
frames_per_trigger:int=1,
**kwargs,
):
"""
@@ -62,6 +63,7 @@ class OMNYFermatScan(SyncFlyScanBase):
cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time
frames_per_trigger:int: Number of burst frames per trigger, defaults to 1.
step(float) [um]: stepsize
zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first)
@@ -73,7 +75,7 @@ class OMNYFermatScan(SyncFlyScanBase):
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01)
"""
super().__init__(parameter=parameter, **kwargs)
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.axis = []
self.fovx = fovx
self.fovy = fovy
@@ -299,6 +301,7 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stage()
yield from self.run_baseline_reading()
yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()

View File

@@ -193,14 +193,15 @@ The basic scan function can be called by `scans.flomni_fermat_scan()` and offers
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x |
| ceny (float) | center position in y |
| exp_time (float) | exposure time |
| exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize |
| zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example:
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps

View File

@@ -327,14 +327,15 @@ The basic scan function can be called by `scans.omny_fermat_scan()` and offers a
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x |
| ceny (float) | center position in y |
| exp_time (float) | exposure time |
| exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize |
| zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example:
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps

View File

@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "csaxs_bec"
version = "0.0.0"
description = "The cSAXS plugin repository for BEC"
requires-python = ">=3.10"
requires-python = ">=3.11"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",

View File

@@ -1,14 +0,0 @@
"""
Conftest runs for all tests in this directory and subdirectories. Thereby, we know for
certain that the SocketSignal.READBACK_TIMEOUT is set to 0 for all tests, which prevents
hanging tests when a readback is attempted on a non-connected socket.
"""
# conftest.py
import pytest
from ophyd_devices.utils.socket import SocketSignal
@pytest.fixture(autouse=True)
def patch_socket_timeout(monkeypatch):
monkeypatch.setattr(SocketSignal, "READBACK_TIMEOUT", 0.0)

View File

@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
lamni = LamNI(client)
align = XrayEyeAlign(client, lamni)
with mock.patch(
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
) as epics_put_mock:
align.save_frame()
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
def test_update_frame(bec_client_mock):
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase(
name="xeye",

View File

@@ -2,6 +2,7 @@ from unittest import mock
import pytest
from ophyd_devices.tests.utils import SocketMock
from ophyd_devices.utils.socket import SocketSignal
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
@@ -17,6 +18,11 @@ def fsamroy(dm_with_devices):
socket_cls=SocketMock,
device_manager=dm_with_devices,
)
for walk in fsamroy_motor.walk_signals():
if isinstance(walk.item, SocketSignal):
walk.item._readback_timeout = (
0.0 # Set the readback timeout to 0 to avoid waiting during tests
)
fsamroy_motor.controller.on()
assert isinstance(fsamroy_motor.controller, FuprGalilController)
yield fsamroy_motor

View File

@@ -9,7 +9,11 @@ from ophyd_devices.tests.utils import SocketMock
from csaxs_bec.devices.npoint.npoint import NPointAxis, NPointController
from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, FlomniGalilMotor
from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGalilMotor
from csaxs_bec.devices.omny.galil.galil_rio import GalilRIO, GalilRIOController, GalilRIOSignalRO
from csaxs_bec.devices.omny.galil.galil_rio import (
GalilRIO,
GalilRIOAnalogSignalRO,
GalilRIOController,
)
from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, LamniGalilMotor
from csaxs_bec.devices.omny.galil.ogalil_ophyd import OMNYGalilController, OMNYGalilMotor
from csaxs_bec.devices.omny.galil.sgalil_ophyd import GalilController, SGalilMotor
@@ -272,26 +276,27 @@ def test_galil_rio_signal_read(galil_rio):
## Test read of all channels
###########
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
assert galil_rio.analog_in.ch0._readback_timeout == 0.1 # Default read timeout of 100ms
# Mock the socket to return specific values
galil_rio.controller.sock.buffer_recv = [b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901"]
galil_rio._last_readback = 0 # Force read from controller
analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
galil_rio.controller.sock.buffer_recv.append(analog_bufffer)
read_values = galil_rio.read()
assert len(read_values) == 8 # 8 channels
expected_values = {
galil_rio.an_ch0.name: {"value": 1.234},
galil_rio.an_ch1.name: {"value": 2.345},
galil_rio.an_ch2.name: {"value": 3.456},
galil_rio.an_ch3.name: {"value": 4.567},
galil_rio.an_ch4.name: {"value": 5.678},
galil_rio.an_ch5.name: {"value": 6.789},
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
galil_rio.analog_in.ch0.name: {"value": 1.234},
galil_rio.analog_in.ch1.name: {"value": 2.345},
galil_rio.analog_in.ch2.name: {"value": 3.456},
galil_rio.analog_in.ch3.name: {"value": 4.567},
galil_rio.analog_in.ch4.name: {"value": 5.678},
galil_rio.analog_in.ch5.name: {"value": 6.789},
galil_rio.analog_in.ch6.name: {"value": 7.890},
galil_rio.analog_in.ch7.name: {"value": 8.901},
}
# All timestamps should be the same
assert all(
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"]
for signal_name, ret in read_values.items()
)
# Check values
@@ -301,7 +306,7 @@ def test_galil_rio_signal_read(galil_rio):
# Check communication command to socker
assert galil_rio.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r"
]
###########
@@ -313,11 +318,11 @@ def test_galil_rio_signal_read(galil_rio):
def value_callback(value, old_value, **kwargs):
obj = kwargs.get("obj")
galil = obj.parent
galil = obj.parent.parent
readback = galil.read()
value_callback_buffer.append(readback)
galil_rio.an_ch0.subscribe(value_callback, run=False)
galil_rio.analog_in.ch0.subscribe(value_callback, run=False)
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
@@ -327,13 +332,15 @@ def test_galil_rio_signal_read(galil_rio):
# Should have used the cached value
for walk in galil_rio.walk_signals():
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
ret = galil_rio.an_ch0.read()
walk.item._readback_timeout = 10 # Make sure cached read is used
ret = galil_rio.analog_in.ch0.read()
# Should not trigger callback since value did not change
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234)
# Same timestamp as for another channel as this is cached read
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
assert np.isclose(
ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp
)
assert len(value_callback_buffer) == 0
##################
@@ -341,10 +348,10 @@ def test_galil_rio_signal_read(galil_rio):
##################
# Now force a read from the controller
galil_rio._last_readback = 0 # Force read from controller
ret = galil_rio.an_ch0.read()
galil_rio.analog_in.ch0._last_readback = 0 # Force read from controller
ret = galil_rio.analog_in.ch0.read()
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5)
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
assert len(value_callback_buffer) == 1
@@ -352,7 +359,45 @@ def test_galil_rio_signal_read(galil_rio):
assert np.isclose(values, expected_values).all()
assert all(
[
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
value["timestamp"]
== value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"]
for value in value_callback_buffer[0].values()
]
)
def test_galil_rio_digital_out_signal(galil_rio):
"""
Test that the Galil RIO digital output signal can be set correctly.
"""
## Test Read from digital output channels
buffer_receive = []
excepted_put_buffer = []
for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS):
cmd = f"MG@OUT[{ii}]\r".encode()
excepted_put_buffer.append(cmd)
recv = " 1.000".encode()
buffer_receive.append(recv)
galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback
digital_read = galil_rio.read_configuration() # Read to populate readback values
for walk in galil_rio.digital_out.walk_signals():
assert np.isclose(digital_read[walk.item.name]["value"], 1.0)
assert galil_rio.controller.sock.buffer_put == excepted_put_buffer
# Test writing to digital output channels
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
# Set digital output channel 0 to high
galil_rio.digital_out.ch0.put(1)
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
# Set digital output channel 0 to low
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
galil_rio.digital_out.ch0.put(0)
assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]

View File

@@ -217,6 +217,16 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
assert not mcs._start_monitor_async_data_emission.is_set()
def test_mcs_on_stop(mock_mcs_csaxs: MCSCardCSAXS):
"""Test that on stop sets the omit_mca_callbacks flag. Also test that on stage clears the omit_mca_callbacks flag."""
mcs = mock_mcs_csaxs
assert mcs._omit_mca_callbacks.is_set() is False
mcs.stop()
assert mcs._omit_mca_callbacks.is_set() is True
mcs.stage()
assert mcs._omit_mca_callbacks.is_set() is False
def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS):
mcs = mock_mcs_csaxs
# Simulate ongoing acquisition

View File

@@ -0,0 +1,190 @@
"""Module for testing the PandaBoxCSAXS and PandaBoxOMNY devices."""
# pylint: skip-file
from __future__ import annotations
from unittest import mock
import pytest
from ophyd import Staged
from csaxs_bec.devices.panda_box.panda_box import PandaBoxCSAXS
from csaxs_bec.devices.panda_box.panda_box_omny import PandaBoxOMNY
@pytest.fixture
def panda_omny():
dev_name = "panda_omny"
dev = PandaBoxOMNY(
name=dev_name,
host="omny-panda-box.psi.ch",
signal_alias={
"FMC_IN.VAL1.Min": "cap_voltage_fzp_y_min",
"FMC_IN.VAL1.Max": "cap_voltage_fzp_y_max",
"FMC_IN.VAL1.Mean": "cap_voltage_fzp_y_mean",
"FMC_IN.VAL2.Min": "cap_voltage_fzp_x_min",
"FMC_IN.VAL2.Max": "cap_voltage_fzp_x_max",
"FMC_IN.VAL2.Mean": "cap_voltage_fzp_x_mean",
},
)
yield dev
@pytest.fixture
def panda_csaxs():
dev_name = "panda_csaxs"
dev = PandaBoxCSAXS(name=dev_name, host="csaxs-panda-box.psi.ch")
yield dev
def test_panda_omny(panda_omny):
assert panda_omny.name == "panda_omny"
assert panda_omny.host == "omny-panda-box.psi.ch"
all_signal_names = [name for name, _ in panda_omny.data.signals]
# Check that the signal aliases are correctly set up
assert "cap_voltage_fzp_y_min" in all_signal_names
assert "cap_voltage_fzp_y_max" in all_signal_names
assert "cap_voltage_fzp_y_mean" in all_signal_names
assert "cap_voltage_fzp_x_min" in all_signal_names
assert "cap_voltage_fzp_x_max" in all_signal_names
assert "cap_voltage_fzp_x_mean" in all_signal_names
# Check that the original signal names are not present
assert "FMC_IN.VAL1.Min" not in all_signal_names
assert "FMC_IN.VAL1.Max" not in all_signal_names
assert "FMC_IN.VAL1.Mean" not in all_signal_names
assert "FMC_IN.VAL2.Min" not in all_signal_names
assert "FMC_IN.VAL2.Max" not in all_signal_names
assert "FMC_IN.VAL2.Mean" not in all_signal_names
assert panda_omny._acquisition_group == "burst"
assert panda_omny._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_acquisition_group):
# Check that the stage signal is present and has the correct PV
assert len(panda_omny._status_callbacks) == 0
panda_omny.scan_info.msg.scan_type = scan_type
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_omny.stage()
assert panda_omny._acquisition_group == expected_acquisition_group
assert panda_omny.staged == Staged.yes
def test_panda_omny_complete(panda_omny):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_omny.scan_info.msg.num_points = 1
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_omny._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_omny, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_omny._timeout_on_completed = 5
with (
mock.patch.object(panda_omny, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True
def test_panda_csaxs(panda_csaxs):
assert panda_csaxs.name == "panda_csaxs"
assert panda_csaxs.host == "csaxs-panda-box.psi.ch"
assert panda_csaxs._acquisition_group == "burst"
assert panda_csaxs._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_acquisition_group):
"""Test the on_stage method of the PandaBoxCSAXS device for different scan types and frames per trigger."""
assert len(panda_csaxs._status_callbacks) == 0
panda_csaxs.scan_info.msg.scan_type = scan_type
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_csaxs.stage()
assert panda_csaxs._acquisition_group == expected_acquisition_group
assert panda_csaxs.staged == Staged.yes
def test_panda_csaxs_complete(panda_csaxs):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_csaxs.scan_info.msg.num_points = 1
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_csaxs._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_csaxs._timeout_on_completed = 5
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True

View File

@@ -302,6 +302,36 @@ def device_manager_mock():
action="set",
parameter={"value": 2.1508313829565293},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="pre_scan",
parameter={},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": 1.3681828686580249},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": 2.1508313829565293},
),
None,
messages.DeviceInstructionMessage(
metadata={