Compare commits
23 Commits
refactor/m
...
fix/mcs-ca
| Author | SHA1 | Date | |
|---|---|---|---|
| c7db607aab | |||
| 4721ec404b | |||
|
4d69f8f90f
|
|||
| 0f072a786e | |||
| 05a1e3d8be | |||
| e9fd9084b8 | |||
| 40ef387134 | |||
|
|
6ed84664f2 | ||
|
|
e5e3343da7 | ||
|
|
c8866faccc | ||
|
|
3b561c251c | ||
|
|
bc187040ad | ||
|
|
efd27a27e8 | ||
|
|
7096ef3323 | ||
| 13378f24dd | |||
|
|
f5b898ea1c
|
||
|
3d62bea04b
|
|||
|
1518845d25
|
|||
|
ff3b6686db
|
|||
| afdc64e296 | |||
| bc31c00e1f | |||
|
|
38671f074e | ||
|
|
92e39a5f75 |
@@ -2,7 +2,7 @@
|
||||
# It is needed to track the repo template version, and editing may break things.
|
||||
# This file will be overwritten by copier on template updates.
|
||||
|
||||
_commit: v1.2.2
|
||||
_commit: v1.2.8
|
||||
_src_path: https://github.com/bec-project/plugin_copier_template.git
|
||||
make_commit: false
|
||||
project_name: csaxs_bec
|
||||
|
||||
@@ -28,7 +28,7 @@ on:
|
||||
description: "Python version to use"
|
||||
required: false
|
||||
type: string
|
||||
default: "3.11"
|
||||
default: "3.12"
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
@@ -44,7 +44,19 @@ jobs:
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
|
||||
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
|
||||
|
||||
- name: Checkout BEC Plugin Repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/csaxs_bec
|
||||
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
|
||||
path: ./csaxs_bec
|
||||
|
||||
- name: Lint for merge conflicts from template updates
|
||||
shell: bash
|
||||
# Find all Copier conflicts except this line
|
||||
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
|
||||
|
||||
- name: Checkout BEC Core
|
||||
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
|
||||
@@ -55,13 +67,6 @@ jobs:
|
||||
- name: Checkout BEC Widgets
|
||||
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
|
||||
|
||||
- name: Checkout BEC Plugin Repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/csaxs_bec
|
||||
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
|
||||
path: ./csaxs_bec
|
||||
|
||||
- name: Install dependencies
|
||||
shell: bash
|
||||
run: |
|
||||
|
||||
62
.gitea/workflows/create_update_pr.yml
Normal file
62
.gitea/workflows/create_update_pr.yml
Normal file
@@ -0,0 +1,62 @@
|
||||
name: Create template upgrade PR for csaxs_bec
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
create_update_branch_and_pr:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
|
||||
steps:
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install tools
|
||||
run: |
|
||||
pip install copier PySide6
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Perform update
|
||||
run: |
|
||||
git config --global user.email "bec_ci_staging@psi.ch"
|
||||
git config --global user.name "BEC automated CI"
|
||||
|
||||
branch="chore/update-template-$(python -m uuid)"
|
||||
echo "switching to branch $branch"
|
||||
git checkout -b $branch
|
||||
|
||||
echo "Running copier update..."
|
||||
output="$(copier update --trust --defaults --conflict inline 2>&1)"
|
||||
echo "$output"
|
||||
msg="$(printf '%s\n' "$output" | head -n 1)"
|
||||
|
||||
if ! grep -q "make_commit: true" .copier-answers.yml ; then
|
||||
echo "Autocommit not made, committing..."
|
||||
git add -A
|
||||
git commit -a -m "$msg"
|
||||
fi
|
||||
|
||||
if diff-index --quiet HEAD ; then
|
||||
echo "No changes detected"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
git push -u origin $branch
|
||||
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
|
||||
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{
|
||||
\"title\": \"Template: $(echo $msg)\",
|
||||
\"body\": \"This PR was created by Gitea Actions\",
|
||||
\"head\": \"$(echo $branch)\",
|
||||
\"base\": \"main\"
|
||||
}"
|
||||
@@ -1,20 +0,0 @@
|
||||
include:
|
||||
- project: bec/awi_utils
|
||||
file: /templates/plugin-repo-template.yml
|
||||
inputs:
|
||||
name: "csaxs"
|
||||
target: "csaxs_bec"
|
||||
branch: $CHILD_PIPELINE_BRANCH
|
||||
|
||||
pages:
|
||||
stage: Deploy
|
||||
needs: []
|
||||
variables:
|
||||
TARGET_BRANCH: $CI_COMMIT_REF_NAME
|
||||
rules:
|
||||
- if: "$CI_COMMIT_TAG != null"
|
||||
variables:
|
||||
TARGET_BRANCH: $CI_COMMIT_TAG
|
||||
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/csaxs_bec"'
|
||||
script:
|
||||
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/
|
||||
@@ -210,13 +210,11 @@ class LamNI(LamNIOpticsMixin):
|
||||
self.feedback_status()
|
||||
|
||||
def feedback_status(self):
|
||||
if self.device_manager.devices.rtx.controller.feedback_is_running():
|
||||
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
|
||||
else:
|
||||
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
|
||||
self.device_manager.devices.rtx.controller.show_feedback_status()
|
||||
|
||||
|
||||
def show_interferometer_positions(self):
|
||||
self.device_manager.devices.rtx.controller.show_interferometer_positions()
|
||||
self.device_manager.devices.rtx.controller.show_feedback_status()
|
||||
|
||||
def show_signal_strength(self):
|
||||
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
|
||||
|
||||
@@ -9,9 +9,11 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
|
||||
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
|
||||
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
bec = builtins.__dict__.get("bec")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
class LamNIInitError(Exception):
|
||||
pass
|
||||
|
||||
442
csaxs_bec/bec_ipython_client/plugins/cSAXS/cSAXSDLPCA200.py
Normal file
442
csaxs_bec/bec_ipython_client/plugins/cSAXS/cSAXSDLPCA200.py
Normal file
@@ -0,0 +1,442 @@
|
||||
"""
|
||||
csaxs_dlpca200.py
|
||||
=================
|
||||
BEC control script for FEMTO DLPCA-200 Variable Gain Low Noise Current Amplifiers
|
||||
connected to Galil RIO digital outputs.
|
||||
|
||||
DLPCA-200 Remote Control (datasheet page 4)
|
||||
-------------------------------------------
|
||||
Sub-D pin -> function:
|
||||
Pin 10 -> gain LSB (digital out channel, index 0 in bit-tuple)
|
||||
Pin 11 -> gain MID (digital out channel, index 1 in bit-tuple)
|
||||
Pin 12 -> gain MSB (digital out channel, index 2 in bit-tuple)
|
||||
Pin 13 -> coupling LOW = AC, HIGH = DC
|
||||
Pin 14 -> speed mode HIGH = low noise (Pin14=1), LOW = high speed (Pin14=0)
|
||||
|
||||
Gain truth table (MSB, MID, LSB):
|
||||
0,0,0 -> low-noise: 1e3 high-speed: 1e5
|
||||
0,0,1 -> low-noise: 1e4 high-speed: 1e6
|
||||
0,1,0 -> low-noise: 1e5 high-speed: 1e7
|
||||
0,1,1 -> low-noise: 1e6 high-speed: 1e8
|
||||
1,0,0 -> low-noise: 1e7 high-speed: 1e9
|
||||
1,0,1 -> low-noise: 1e8 high-speed: 1e10
|
||||
1,1,0 -> low-noise: 1e9 high-speed: 1e11
|
||||
|
||||
Strategy: prefer low-noise mode (1e3-1e9). For 1e10 and 1e11,
|
||||
automatically fall back to high-speed mode.
|
||||
|
||||
Device wiring example (galilrioesxbox):
|
||||
bpm4: Pin10->ch0, Pin11->ch1, Pin12->ch2, Pin13->ch3, Pin14->ch4
|
||||
bim: Pin10->ch6, Pin11->ch7, Pin12->ch8, Pin13->ch9, Pin14->ch10
|
||||
|
||||
Usage examples
|
||||
--------------
|
||||
csaxs_amp = cSAXSDLPCA200(client)
|
||||
|
||||
csaxs_amp.set_gain("bpm4", 1e7) # low-noise if possible
|
||||
csaxs_amp.set_gain("bim", 1e10) # auto high-speed
|
||||
csaxs_amp.set_coupling("bpm4", "DC")
|
||||
csaxs_amp.set_coupling("bim", "AC")
|
||||
csaxs_amp.info("bpm4") # print current settings
|
||||
csaxs_amp.info_all() # print all configured amplifiers
|
||||
"""
|
||||
|
||||
import builtins
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Amplifier registry
|
||||
# ---------------------------------------------------------------------------
|
||||
# Each entry describes one DLPCA-200 amplifier connected to a Galil RIO.
|
||||
#
|
||||
# Keys inside "channels":
|
||||
# gain_lsb -> digital output channel number wired to DLPCA-200 Pin 10
|
||||
# gain_mid -> digital output channel number wired to DLPCA-200 Pin 11
|
||||
# gain_msb -> digital output channel number wired to DLPCA-200 Pin 12
|
||||
# coupling -> digital output channel number wired to DLPCA-200 Pin 13
|
||||
# speed_mode -> digital output channel number wired to DLPCA-200 Pin 14
|
||||
#
|
||||
# To add a new amplifier, simply extend this dict.
|
||||
# ---------------------------------------------------------------------------
|
||||
DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
|
||||
"bpm4": {
|
||||
"rio_device": "galilrioesxbox",
|
||||
"description": "Beam Position Monitor 4 current amplifier",
|
||||
"channels": {
|
||||
"gain_lsb": 0, # Pin 10 -> Galil ch0
|
||||
"gain_mid": 1, # Pin 11 -> Galil ch1
|
||||
"gain_msb": 2, # Pin 12 -> Galil ch2
|
||||
"coupling": 3, # Pin 13 -> Galil ch3
|
||||
"speed_mode": 4, # Pin 14 -> Galil ch4
|
||||
},
|
||||
},
|
||||
"bim": {
|
||||
"rio_device": "galilrioesxbox",
|
||||
"description": "Beam Intensity Monitor current amplifier",
|
||||
"channels": {
|
||||
"gain_lsb": 6, # Pin 10 -> Galil ch6
|
||||
"gain_mid": 7, # Pin 11 -> Galil ch7
|
||||
"gain_msb": 8, # Pin 12 -> Galil ch8
|
||||
"coupling": 9, # Pin 13 -> Galil ch9
|
||||
"speed_mode": 10, # Pin 14 -> Galil ch10
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DLPCA-200 gain encoding tables
|
||||
# ---------------------------------------------------------------------------
|
||||
# (msb, mid, lsb) -> gain in V/A
|
||||
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e3),
|
||||
(0, 0, 1): int(1e4),
|
||||
(0, 1, 0): int(1e5),
|
||||
(0, 1, 1): int(1e6),
|
||||
(1, 0, 0): int(1e7),
|
||||
(1, 0, 1): int(1e8),
|
||||
(1, 1, 0): int(1e9),
|
||||
}
|
||||
|
||||
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e5),
|
||||
(0, 0, 1): int(1e6),
|
||||
(0, 1, 0): int(1e7),
|
||||
(0, 1, 1): int(1e8),
|
||||
(1, 0, 0): int(1e9),
|
||||
(1, 0, 1): int(1e10),
|
||||
(1, 1, 0): int(1e11),
|
||||
}
|
||||
|
||||
# Inverse maps: gain -> (msb, mid, lsb, low_noise_flag)
|
||||
# low_noise_flag: True = Pin14 HIGH, False = Pin14 LOW
|
||||
_GAIN_TO_BITS: dict[int, tuple] = {}
|
||||
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, True)
|
||||
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
|
||||
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, False)
|
||||
|
||||
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
|
||||
|
||||
|
||||
class cSAXSDLPCA200Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class cSAXSDLPCA200:
|
||||
"""
|
||||
Control class for FEMTO DLPCA-200 current amplifiers connected via Galil RIO
|
||||
digital outputs in a BEC environment.
|
||||
|
||||
Supports:
|
||||
- Forward control: set_gain(), set_coupling()
|
||||
- Readback reporting: info(), info_all(), read_settings()
|
||||
- Robust error handling and logging following cSAXS conventions.
|
||||
"""
|
||||
|
||||
TAG = "[DLPCA200]"
|
||||
|
||||
def __init__(self, client, config: dict | None = None) -> None:
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
client : BEC client object (passed through for future use)
|
||||
config : optional override for DLPCA200_AMPLIFIER_CONFIG.
|
||||
Falls back to the module-level dict if not provided.
|
||||
"""
|
||||
self.client = client
|
||||
self._config: dict[str, dict] = config if config is not None else DLPCA200_AMPLIFIER_CONFIG
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _require_dev(self) -> None:
|
||||
if dev is None:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} BEC 'dev' namespace is not available in this session."
|
||||
)
|
||||
|
||||
def _get_cfg(self, amp_name: str) -> dict:
|
||||
"""Return config dict for a named amplifier, raising on unknown names."""
|
||||
if amp_name not in self._config:
|
||||
known = ", ".join(sorted(self._config.keys()))
|
||||
raise cSAXSDLPCA200Error(f"{self.TAG} Unknown amplifier '{amp_name}'. Known: [{known}]")
|
||||
return self._config[amp_name]
|
||||
|
||||
def _get_rio(self, amp_name: str):
|
||||
"""Return the live RIO device object for a given amplifier."""
|
||||
self._require_dev()
|
||||
cfg = self._get_cfg(amp_name)
|
||||
rio_name = cfg["rio_device"]
|
||||
try:
|
||||
rio = getattr(dev, rio_name)
|
||||
except AttributeError:
|
||||
raise cSAXSDLPCA200Error(f"{self.TAG} RIO device '{rio_name}' not found in BEC 'dev'.")
|
||||
return rio
|
||||
|
||||
def _dout_get(self, rio, ch: int) -> int:
|
||||
"""Read one digital output channel (returns 0 or 1)."""
|
||||
attr = getattr(rio.digital_out, f"ch{ch}")
|
||||
val = attr.get()
|
||||
return int(val)
|
||||
|
||||
def _dout_set(self, rio, ch: int, value: bool) -> None:
|
||||
"""Write one digital output channel (True=HIGH=1, False=LOW=0)."""
|
||||
attr = getattr(rio.digital_out, f"ch{ch}")
|
||||
attr.set(value)
|
||||
|
||||
def _read_gain_bits(self, amp_name: str) -> tuple[int, int, int, int]:
|
||||
"""
|
||||
Read current gain bit-state from hardware.
|
||||
|
||||
Returns
|
||||
-------
|
||||
(msb, mid, lsb, speed_mode)
|
||||
speed_mode: 1 = low-noise (Pin14=HIGH), 0 = high-speed (Pin14=LOW)
|
||||
"""
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
msb = self._dout_get(rio, ch["gain_msb"])
|
||||
mid = self._dout_get(rio, ch["gain_mid"])
|
||||
lsb = self._dout_get(rio, ch["gain_lsb"])
|
||||
speed_mode = self._dout_get(rio, ch["speed_mode"])
|
||||
return msb, mid, lsb, speed_mode
|
||||
|
||||
def _decode_gain(self, msb: int, mid: int, lsb: int, speed_mode: int) -> int | None:
|
||||
"""
|
||||
Decode hardware bit-state into gain value (V/A).
|
||||
|
||||
speed_mode=1 -> low-noise table, speed_mode=0 -> high-speed table.
|
||||
Returns None if the bit combination is not in the table.
|
||||
"""
|
||||
bits = (msb, mid, lsb)
|
||||
if speed_mode:
|
||||
return _GAIN_BITS_LOW_NOISE.get(bits)
|
||||
else:
|
||||
return _GAIN_BITS_HIGH_SPEED.get(bits)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API - control
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def set_gain(self, amp_name: str, gain: float, force_high_speed: bool = False) -> None:
|
||||
"""
|
||||
Set the transimpedance gain of a DLPCA-200 amplifier.
|
||||
|
||||
The method automatically selects low-noise mode (Pin14=HIGH) whenever
|
||||
the requested gain is achievable in low-noise mode (1e3 - 1e9 V/A).
|
||||
For gains of 1e10 and 1e11 V/A, high-speed mode is used automatically.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
amp_name : str
|
||||
Amplifier name as defined in DLPCA200_AMPLIFIER_CONFIG (e.g. "bpm4").
|
||||
gain : float or int
|
||||
Target gain in V/A. Must be one of:
|
||||
1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11.
|
||||
force_high_speed : bool, optional
|
||||
If True, force high-speed (low-noise=False) mode even for gains
|
||||
below 1e10. Default: False (prefer low-noise).
|
||||
|
||||
Examples
|
||||
--------
|
||||
csaxs_amp.set_gain("bpm4", 1e7) # low-noise mode (automatic)
|
||||
csaxs_amp.set_gain("bim", 1e10) # high-speed mode (automatic)
|
||||
csaxs_amp.set_gain("bpm4", 1e7, force_high_speed=True) # override to high-speed
|
||||
"""
|
||||
gain_int = int(gain)
|
||||
if gain_int not in _GAIN_TO_BITS:
|
||||
valid_str = ", ".join(
|
||||
f"1e{int(round(__import__('math').log10(g)))}" for g in VALID_GAINS
|
||||
)
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Invalid gain {gain:.2e} V/A for '{amp_name}'. "
|
||||
f"Valid values: {valid_str}"
|
||||
)
|
||||
|
||||
msb, mid, lsb, low_noise_preferred = _GAIN_TO_BITS[gain_int]
|
||||
|
||||
# Apply force_high_speed override
|
||||
if force_high_speed and low_noise_preferred:
|
||||
# Check if this gain is achievable in high-speed mode
|
||||
hs_entry = next(
|
||||
(bits for bits, g in _GAIN_BITS_HIGH_SPEED.items() if g == gain_int), None
|
||||
)
|
||||
if hs_entry is None:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Gain {gain:.2e} V/A is not achievable in high-speed mode "
|
||||
f"for '{amp_name}'."
|
||||
)
|
||||
msb, mid, lsb = hs_entry
|
||||
low_noise_preferred = False
|
||||
|
||||
use_low_noise = low_noise_preferred and not force_high_speed
|
||||
|
||||
try:
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
|
||||
self._dout_set(rio, ch["gain_msb"], bool(msb))
|
||||
self._dout_set(rio, ch["gain_mid"], bool(mid))
|
||||
self._dout_set(rio, ch["gain_lsb"], bool(lsb))
|
||||
self._dout_set(rio, ch["speed_mode"], use_low_noise) # True=low-noise
|
||||
|
||||
mode_str = "low-noise" if use_low_noise else "high-speed"
|
||||
logger.info(
|
||||
f"{self.TAG} [{amp_name}] gain set to {gain_int:.2e} V/A "
|
||||
f"({mode_str} mode, bits MSB={msb} MID={mid} LSB={lsb})"
|
||||
)
|
||||
print(
|
||||
f"{amp_name}: gain -> {gain_int:.2e} V/A [{mode_str}] "
|
||||
f"(bits: MSB={msb} MID={mid} LSB={lsb})"
|
||||
)
|
||||
|
||||
except cSAXSDLPCA200Error:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Failed to set gain on '{amp_name}': {exc}"
|
||||
) from exc
|
||||
|
||||
def set_coupling(self, amp_name: str, coupling: str) -> None:
|
||||
"""
|
||||
Set AC or DC coupling on a DLPCA-200 amplifier.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
amp_name : str
|
||||
Amplifier name (e.g. "bpm4", "bim").
|
||||
coupling : str
|
||||
"AC" or "DC" (case-insensitive).
|
||||
DC -> Pin13 HIGH, AC -> Pin13 LOW.
|
||||
|
||||
Examples
|
||||
--------
|
||||
csaxs_amp.set_coupling("bpm4", "DC")
|
||||
csaxs_amp.set_coupling("bim", "AC")
|
||||
"""
|
||||
coupling_upper = coupling.strip().upper()
|
||||
if coupling_upper not in ("AC", "DC"):
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Invalid coupling '{coupling}' for '{amp_name}'. " f"Use 'AC' or 'DC'."
|
||||
)
|
||||
|
||||
pin13_high = coupling_upper == "DC"
|
||||
|
||||
try:
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
self._dout_set(rio, ch["coupling"], pin13_high)
|
||||
|
||||
logger.info(f"{self.TAG} [{amp_name}] coupling set to {coupling_upper}")
|
||||
print(f"{amp_name}: coupling -> {coupling_upper}")
|
||||
|
||||
except cSAXSDLPCA200Error:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise cSAXSDLPCA200Error(
|
||||
f"{self.TAG} Failed to set coupling on '{amp_name}': {exc}"
|
||||
) from exc
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API - readback / reporting
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def read_settings(self, amp_name: str) -> dict:
|
||||
"""
|
||||
Read back the current settings from hardware digital outputs.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict with keys:
|
||||
"amp_name" : str
|
||||
"gain" : int or None - gain in V/A (None if unknown bit pattern)
|
||||
"mode" : str - "low-noise" or "high-speed"
|
||||
"coupling" : str - "AC" or "DC"
|
||||
"bits" : dict - raw bit values {msb, mid, lsb, speed_mode, coupling}
|
||||
"""
|
||||
rio = self._get_rio(amp_name)
|
||||
ch = self._get_cfg(amp_name)["channels"]
|
||||
|
||||
msb = self._dout_get(rio, ch["gain_msb"])
|
||||
mid = self._dout_get(rio, ch["gain_mid"])
|
||||
lsb = self._dout_get(rio, ch["gain_lsb"])
|
||||
speed_mode = self._dout_get(rio, ch["speed_mode"])
|
||||
coupling_bit = self._dout_get(rio, ch["coupling"])
|
||||
|
||||
gain = self._decode_gain(msb, mid, lsb, speed_mode)
|
||||
mode = "low-noise" if speed_mode else "high-speed"
|
||||
coupling = "DC" if coupling_bit else "AC"
|
||||
|
||||
return {
|
||||
"amp_name": amp_name,
|
||||
"gain": gain,
|
||||
"mode": mode,
|
||||
"coupling": coupling,
|
||||
"bits": {
|
||||
"msb": msb,
|
||||
"mid": mid,
|
||||
"lsb": lsb,
|
||||
"speed_mode": speed_mode,
|
||||
"coupling": coupling_bit,
|
||||
},
|
||||
}
|
||||
|
||||
def info(self, amp_name: str) -> None:
|
||||
"""
|
||||
Print a plain summary of the current settings for one amplifier.
|
||||
|
||||
Example output
|
||||
--------------
|
||||
Amplifier : bpm4
|
||||
Description : Beam Position Monitor 4 current amplifier
|
||||
RIO device : galilrioesxbox
|
||||
Gain : 1.00e+07 V/A
|
||||
Mode : low-noise
|
||||
Coupling : DC
|
||||
Raw bits : MSB=1 MID=0 LSB=0 speed=1 coup=1
|
||||
"""
|
||||
cfg = self._get_cfg(amp_name)
|
||||
|
||||
try:
|
||||
s = self.read_settings(amp_name)
|
||||
except Exception as exc:
|
||||
print(f"{self.TAG} [{amp_name}] Could not read settings: {exc}")
|
||||
return
|
||||
|
||||
gain_str = (
|
||||
f"{s['gain']:.2e} V/A" if s["gain"] is not None else "UNKNOWN (invalid bit pattern)"
|
||||
)
|
||||
bits = s["bits"]
|
||||
|
||||
print(f" {'Amplifier':<12}: {amp_name}")
|
||||
print(f" {'Description':<12}: {cfg.get('description', '')}")
|
||||
print(f" {'RIO device':<12}: {cfg['rio_device']}")
|
||||
print(f" {'Gain':<12}: {gain_str}")
|
||||
print(f" {'Mode':<12}: {s['mode']}")
|
||||
print(f" {'Coupling':<12}: {s['coupling']}")
|
||||
print(
|
||||
f" {'Raw bits':<12}: MSB={bits['msb']} MID={bits['mid']} LSB={bits['lsb']} speed={bits['speed_mode']} coup={bits['coupling']}"
|
||||
)
|
||||
|
||||
def info_all(self) -> None:
|
||||
"""
|
||||
Print a plain summary for ALL configured amplifiers.
|
||||
"""
|
||||
print("\nDLPCA-200 Amplifier Status Report")
|
||||
print("-" * 40)
|
||||
for amp_name in sorted(self._config.keys()):
|
||||
self.info(amp_name)
|
||||
print()
|
||||
|
||||
def list_amplifiers(self) -> list[str]:
|
||||
"""Return sorted list of configured amplifier names."""
|
||||
return sorted(self._config.keys())
|
||||
@@ -41,8 +41,10 @@ import builtins
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
class cSAXSFilterTransmission:
|
||||
"""
|
||||
|
||||
@@ -8,11 +8,14 @@ from bec_lib import bec_logger
|
||||
logger = bec_logger.logger
|
||||
|
||||
# Pull BEC globals if present
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
class cSAXSInitSmaractStagesError(Exception):
|
||||
pass
|
||||
@@ -383,7 +386,6 @@ class cSAXSInitSmaractStages:
|
||||
if not self._yesno("Proceed with the motions listed above?", "y"):
|
||||
logger.info("[cSAXS] Motion to initial position aborted by user.")
|
||||
return
|
||||
|
||||
# --- Execution phase (SIMULTANEOUS MOTION) ---
|
||||
if umv is None:
|
||||
logger.error("[cSAXS] 'umv' is not available in this session.")
|
||||
|
||||
@@ -22,8 +22,10 @@ logger = bec_logger.logger
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
class FlomniToolsError(Exception):
|
||||
|
||||
@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
class flomniGuiToolsError(Exception):
|
||||
|
||||
@@ -13,8 +13,10 @@ logger = bec_logger.logger
|
||||
# import builtins to avoid linter errors
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_ipython_client.plugins.flomni import Flomni
|
||||
|
||||
@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
class OMNYGuiToolsError(Exception):
|
||||
|
||||
@@ -27,9 +27,10 @@ logger = bec_logger.logger
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
class OMNYInitError(Exception):
|
||||
pass
|
||||
|
||||
@@ -16,8 +16,10 @@ from rich.table import Table
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
class OMNYToolsError(Exception):
|
||||
|
||||
@@ -16,8 +16,10 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fsh
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
class OMNYTransferError(Exception):
|
||||
|
||||
@@ -13,8 +13,10 @@ logger = bec_logger.logger
|
||||
# import builtins to avoid linter errors
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
def umv(*args):
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_ipython_client.plugins.omny import OMNY
|
||||
|
||||
@@ -30,29 +30,74 @@ logger = bec_logger.logger
|
||||
|
||||
logger.info("Using the cSAXS startup script.")
|
||||
|
||||
# pylint: disable=import-error
|
||||
_args = _main_dict["args"]
|
||||
|
||||
_session_name = "cSAXS"
|
||||
if _args.session.lower() == "lamni":
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
|
||||
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
|
||||
|
||||
_session_name = "LamNI"
|
||||
lamni = LamNI(bec)
|
||||
logger.success("LamNI session loaded.")
|
||||
|
||||
elif _args.session.lower() == "csaxs":
|
||||
print("Loading cSAXS session")
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
|
||||
|
||||
logger.success("cSAXS session loaded.")
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools
|
||||
|
||||
debug = DebugTools()
|
||||
logger.success("Debug tools loaded. Use 'debug' to access them.")
|
||||
|
||||
# pylint: disable=import-error
|
||||
_args = _main_dict["args"]
|
||||
|
||||
_session_name = "cSAXS"
|
||||
|
||||
print("Loading cSAXS session")
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS.cSAXS import cSAXS
|
||||
csaxs = cSAXS(bec)
|
||||
logger.success("cSAXS session loaded.")
|
||||
|
||||
|
||||
if _args.session.lower() == "lamni":
|
||||
from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI
|
||||
|
||||
_session_name = "LamNI"
|
||||
lamni = LamNI(bec)
|
||||
logger.success("LamNI session loaded.")
|
||||
print(r"""
|
||||
██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗
|
||||
██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║
|
||||
██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║
|
||||
██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║
|
||||
██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║
|
||||
╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
|
||||
|
||||
B E C L a m N I
|
||||
""")
|
||||
|
||||
elif _args.session.lower() == "omny":
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni import OMNY
|
||||
|
||||
_session_name = "OMNY"
|
||||
omny = OMNY(bec)
|
||||
logger.success("OMNY session loaded.")
|
||||
print(r"""
|
||||
██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗
|
||||
██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝
|
||||
██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝
|
||||
██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝
|
||||
██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║
|
||||
╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝
|
||||
|
||||
B E C O M N Y
|
||||
""")
|
||||
|
||||
elif _args.session.lower() == "flomni":
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni
|
||||
|
||||
_session_name = "flomni"
|
||||
flomni = Flomni(bec)
|
||||
logger.success("flomni session loaded.")
|
||||
print(r"""
|
||||
██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗
|
||||
██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║
|
||||
██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║
|
||||
██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║
|
||||
██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║
|
||||
╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
|
||||
|
||||
B E C f l O M N I
|
||||
""")
|
||||
|
||||
|
||||
# SETUP BEAMLINE INFO
|
||||
from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
|
||||
|
||||
@@ -13,69 +13,10 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
_Widgets = {
|
||||
"OmnyAlignment": "OmnyAlignment",
|
||||
"XRayEye": "XRayEye",
|
||||
}
|
||||
|
||||
|
||||
class OmnyAlignment(RPCBase):
|
||||
@property
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@enable_live_view.setter
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def user_message(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@user_message.setter
|
||||
@rpc_call
|
||||
def user_message(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def sample_name(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@sample_name.setter
|
||||
@rpc_call
|
||||
def sample_name(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def enable_move_buttons(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@enable_move_buttons.setter
|
||||
@rpc_call
|
||||
def enable_move_buttons(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
|
||||
class XRayEye(RPCBase):
|
||||
@rpc_call
|
||||
def active_roi(self) -> "BaseROI | None":
|
||||
@@ -83,20 +24,6 @@ class XRayEye(RPCBase):
|
||||
Return the currently active ROI, or None if no ROI is active.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
Get or set the live view enabled state.
|
||||
"""
|
||||
|
||||
@enable_live_view.setter
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
Get or set the live view enabled state.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def user_message(self):
|
||||
|
||||
@@ -1,140 +0,0 @@
|
||||
|
||||
|
||||
from typing import TypedDict
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
import os
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.ui_loader import UILoader
|
||||
from qtpy.QtWidgets import QWidget, QPushButton, QLineEdit, QLabel, QVBoxLayout
|
||||
from bec_qthemes import material_icon
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
# class OmnyAlignmentUIComponents(TypedDict):
|
||||
# moveRightButton: QPushButton
|
||||
# moveLeftButton: QPushButton
|
||||
# moveUpButton: QPushButton
|
||||
# moveDownButton: QPushButton
|
||||
# image: Image
|
||||
|
||||
|
||||
class OmnyAlignment(BECWidget, QWidget):
|
||||
USER_ACCESS = ["enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter","sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
|
||||
PLUGIN = True
|
||||
ui_file = "./omny_alignment.ui"
|
||||
|
||||
def __init__(self, parent=None, **kwargs):
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
|
||||
self._load_ui()
|
||||
|
||||
def _load_ui(self):
|
||||
current_path = os.path.dirname(__file__)
|
||||
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
|
||||
layout = QVBoxLayout()
|
||||
layout.addWidget(self.ui)
|
||||
self.setLayout(layout)
|
||||
|
||||
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
|
||||
self.ui.moveRightButton.setText("")
|
||||
self.ui.moveRightButton.setIcon(
|
||||
material_icon(icon_name="keyboard_arrow_right", **icon_options)
|
||||
)
|
||||
|
||||
self.ui.moveLeftButton.setText("")
|
||||
self.ui.moveLeftButton.setIcon(
|
||||
material_icon(icon_name="keyboard_arrow_left", **icon_options)
|
||||
)
|
||||
|
||||
self.ui.moveUpButton.setText("")
|
||||
self.ui.moveUpButton.setIcon(
|
||||
material_icon(icon_name="keyboard_arrow_up", **icon_options)
|
||||
)
|
||||
|
||||
self.ui.moveDownButton.setText("")
|
||||
self.ui.moveDownButton.setIcon(
|
||||
material_icon(icon_name="keyboard_arrow_down", **icon_options)
|
||||
)
|
||||
|
||||
self.ui.confirmButton.setText("OK")
|
||||
|
||||
|
||||
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
|
||||
|
||||
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
|
||||
|
||||
|
||||
@property
|
||||
def enable_live_view(self):
|
||||
return self.ui.liveViewSwitch.checked
|
||||
|
||||
@enable_live_view.setter
|
||||
def enable_live_view(self, enable:bool):
|
||||
self.ui.liveViewSwitch.checked = enable
|
||||
|
||||
|
||||
@property
|
||||
def user_message(self):
|
||||
return self.ui.messageLineEdit.text()
|
||||
|
||||
@user_message.setter
|
||||
def user_message(self, message:str):
|
||||
self.ui.messageLineEdit.setText(message)
|
||||
|
||||
@property
|
||||
def sample_name(self):
|
||||
return self.ui.sampleLineEdit.text()
|
||||
|
||||
@sample_name.setter
|
||||
def sample_name(self, message:str):
|
||||
self.ui.sampleLineEdit.setText(message)
|
||||
|
||||
|
||||
@SafeSlot(bool)
|
||||
def on_live_view_enabled(self, enabled:bool):
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
logger.info(f"Live view is enabled: {enabled}")
|
||||
image: Image = self.ui.image
|
||||
if enabled:
|
||||
image.image("cam_xeye")
|
||||
return
|
||||
|
||||
image.disconnect_monitor("cam_xeye")
|
||||
|
||||
|
||||
@property
|
||||
def enable_move_buttons(self):
|
||||
move_up:QPushButton = self.ui.moveUpButton
|
||||
move_down:QPushButton = self.ui.moveDownButton
|
||||
move_left:QPushButton = self.ui.moveLeftButton
|
||||
move_right:QPushButton = self.ui.moveRightButton
|
||||
return move_up.isEnabled() and move_down.isEnabled() and move_left.isEnabled() and move_right.isEnabled()
|
||||
|
||||
@enable_move_buttons.setter
|
||||
def enable_move_buttons(self, enabled:bool):
|
||||
move_up:QPushButton = self.ui.moveUpButton
|
||||
move_down:QPushButton = self.ui.moveDownButton
|
||||
move_left:QPushButton = self.ui.moveLeftButton
|
||||
move_right:QPushButton = self.ui.moveRightButton
|
||||
|
||||
move_up.setEnabled(enabled)
|
||||
move_down.setEnabled(enabled)
|
||||
move_left.setEnabled(enabled)
|
||||
move_right.setEnabled(enabled)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from qtpy.QtWidgets import QApplication
|
||||
import sys
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
widget = OmnyAlignment()
|
||||
|
||||
widget.show()
|
||||
sys.exit(app.exec_())
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['omny_alignment.py']}
|
||||
@@ -1,125 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<ui version="4.0">
|
||||
<class>Form</class>
|
||||
<widget class="QWidget" name="Form">
|
||||
<property name="geometry">
|
||||
<rect>
|
||||
<x>0</x>
|
||||
<y>0</y>
|
||||
<width>988</width>
|
||||
<height>821</height>
|
||||
</rect>
|
||||
</property>
|
||||
<property name="windowTitle">
|
||||
<string>Form</string>
|
||||
</property>
|
||||
<layout class="QGridLayout" name="gridLayout_3">
|
||||
<item row="2" column="2">
|
||||
<layout class="QGridLayout" name="gridLayout">
|
||||
<item row="1" column="2">
|
||||
<widget class="QPushButton" name="moveRightButton">
|
||||
<property name="text">
|
||||
<string>PushButton</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="1" column="0">
|
||||
<widget class="QPushButton" name="moveLeftButton">
|
||||
<property name="text">
|
||||
<string>PushButton</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="0" column="1">
|
||||
<widget class="QPushButton" name="moveUpButton">
|
||||
<property name="text">
|
||||
<string>Up</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="2" column="1">
|
||||
<widget class="QPushButton" name="moveDownButton">
|
||||
<property name="text">
|
||||
<string>PushButton</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="1" column="1">
|
||||
<widget class="QPushButton" name="confirmButton">
|
||||
<property name="text">
|
||||
<string>PushButton</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
</layout>
|
||||
</item>
|
||||
<item row="2" column="0">
|
||||
<layout class="QGridLayout" name="gridLayout_2">
|
||||
<item row="0" column="1">
|
||||
<widget class="QLineEdit" name="sampleLineEdit"/>
|
||||
</item>
|
||||
<item row="1" column="1">
|
||||
<widget class="QLineEdit" name="messageLineEdit"/>
|
||||
</item>
|
||||
<item row="0" column="0">
|
||||
<widget class="QLabel" name="label">
|
||||
<property name="text">
|
||||
<string>Sample</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="1" column="0">
|
||||
<widget class="QLabel" name="label_2">
|
||||
<property name="text">
|
||||
<string>Message</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
</layout>
|
||||
</item>
|
||||
<item row="1" column="0" colspan="3">
|
||||
<widget class="Image" name="image">
|
||||
<property name="enable_toolbar" stdset="0">
|
||||
<bool>false</bool>
|
||||
</property>
|
||||
<property name="inner_axes" stdset="0">
|
||||
<bool>false</bool>
|
||||
</property>
|
||||
<property name="monitor" stdset="0">
|
||||
<string>cam_xeye</string>
|
||||
</property>
|
||||
<property name="rotation" stdset="0">
|
||||
<number>3</number>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="0" column="3">
|
||||
<widget class="ToggleSwitch" name="liveViewSwitch"/>
|
||||
</item>
|
||||
<item row="0" column="2">
|
||||
<widget class="QLabel" name="label_3">
|
||||
<property name="text">
|
||||
<string>Live View</string>
|
||||
</property>
|
||||
<property name="alignment">
|
||||
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
</layout>
|
||||
</widget>
|
||||
<customwidgets>
|
||||
<customwidget>
|
||||
<class>Image</class>
|
||||
<extends>QWidget</extends>
|
||||
<header>image</header>
|
||||
</customwidget>
|
||||
<customwidget>
|
||||
<class>ToggleSwitch</class>
|
||||
<extends>QWidget</extends>
|
||||
<header>toggle_switch</header>
|
||||
</customwidget>
|
||||
</customwidgets>
|
||||
<resources/>
|
||||
<connections/>
|
||||
</ui>
|
||||
@@ -1,54 +0,0 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment import OmnyAlignment
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='OmnyAlignment' name='omny_alignment'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class OmnyAlignmentPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
t = OmnyAlignment(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(OmnyAlignment.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "omny_alignment"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "OmnyAlignment"
|
||||
|
||||
def toolTip(self):
|
||||
return "OmnyAlignment"
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -1,15 +0,0 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from csaxs_bec.bec_widgets.widgets.omny_alignment.omny_alignment_plugin import OmnyAlignmentPlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(OmnyAlignmentPlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -33,46 +33,48 @@ class XRayEye2DControl(BECWidget, QWidget):
|
||||
self.get_bec_shortcuts()
|
||||
self._step_size = step_size
|
||||
self.root_layout = QGridLayout(self)
|
||||
self.setStyleSheet("""
|
||||
self.setStyleSheet(
|
||||
"""
|
||||
QToolButton {
|
||||
border: 1px solid;
|
||||
border-radius: 4px;
|
||||
}
|
||||
""")
|
||||
"""
|
||||
)
|
||||
# Up
|
||||
self.move_up_button = QToolButton(parent=self)
|
||||
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
|
||||
self.move_up_button.setIcon(material_icon("keyboard_double_arrow_up"))
|
||||
self.root_layout.addWidget(self.move_up_button, 0, 2)
|
||||
# Up tweak button
|
||||
self.move_up_tweak_button = QToolButton(parent=self)
|
||||
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
|
||||
self.move_up_tweak_button.setIcon(material_icon("keyboard_arrow_up"))
|
||||
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
|
||||
|
||||
# Left
|
||||
self.move_left_button = QToolButton(parent=self)
|
||||
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
|
||||
self.move_left_button.setIcon(material_icon("keyboard_double_arrow_left"))
|
||||
self.root_layout.addWidget(self.move_left_button, 2, 0)
|
||||
# Left tweak button
|
||||
self.move_left_tweak_button = QToolButton(parent=self)
|
||||
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
|
||||
self.move_left_tweak_button.setIcon(material_icon("keyboard_arrow_left"))
|
||||
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
|
||||
|
||||
# Right
|
||||
self.move_right_button = QToolButton(parent=self)
|
||||
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
|
||||
self.move_right_button.setIcon(material_icon("keyboard_double_arrow_right"))
|
||||
self.root_layout.addWidget(self.move_right_button, 2, 4)
|
||||
# Right tweak button
|
||||
self.move_right_tweak_button = QToolButton(parent=self)
|
||||
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
|
||||
self.move_right_tweak_button.setIcon(material_icon("keyboard_arrow_right"))
|
||||
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
|
||||
|
||||
# Down
|
||||
self.move_down_button = QToolButton(parent=self)
|
||||
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
|
||||
self.move_down_button.setIcon(material_icon("keyboard_double_arrow_down"))
|
||||
self.root_layout.addWidget(self.move_down_button, 4, 2)
|
||||
# Down tweak button
|
||||
self.move_down_tweak_button = QToolButton(parent=self)
|
||||
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
|
||||
self.move_down_tweak_button.setIcon(material_icon("keyboard_arrow_down"))
|
||||
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
|
||||
|
||||
# Connections
|
||||
@@ -124,8 +126,15 @@ class XRayEye2DControl(BECWidget, QWidget):
|
||||
|
||||
|
||||
class XRayEye(BECWidget, QWidget):
|
||||
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
|
||||
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
|
||||
USER_ACCESS = [
|
||||
"active_roi",
|
||||
"user_message",
|
||||
"user_message.setter",
|
||||
"sample_name",
|
||||
"sample_name.setter",
|
||||
"enable_move_buttons",
|
||||
"enable_move_buttons.setter",
|
||||
]
|
||||
PLUGIN = True
|
||||
|
||||
def __init__(self, parent=None, **kwargs):
|
||||
@@ -136,7 +145,9 @@ class XRayEye(BECWidget, QWidget):
|
||||
self._make_connections()
|
||||
|
||||
# Connection to redis endpoints
|
||||
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
|
||||
)
|
||||
self.connect_motors()
|
||||
self.resize(800, 600)
|
||||
QTimer.singleShot(0, self._init_gui_trigger)
|
||||
@@ -145,7 +156,9 @@ class XRayEye(BECWidget, QWidget):
|
||||
self.core_layout = QHBoxLayout(self)
|
||||
|
||||
self.image = Image(parent=self)
|
||||
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
|
||||
self.image.enable_toolbar = (
|
||||
False # Disable default toolbar to not allow to user set anything
|
||||
)
|
||||
self.image.inner_axes = False # Disable inner axes to maximize image area
|
||||
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
|
||||
|
||||
@@ -156,8 +169,9 @@ class XRayEye(BECWidget, QWidget):
|
||||
self.control_panel_layout.setSpacing(10)
|
||||
|
||||
# ROI toolbar + Live toggle (header row)
|
||||
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
|
||||
compact_orientation="horizontal")
|
||||
self.roi_manager = ROIPropertyTree(
|
||||
parent=self, image_widget=self.image, compact=True, compact_orientation="horizontal"
|
||||
)
|
||||
header_row = QHBoxLayout()
|
||||
header_row.setContentsMargins(0, 0, 0, 0)
|
||||
header_row.setSpacing(8)
|
||||
@@ -230,7 +244,9 @@ class XRayEye(BECWidget, QWidget):
|
||||
|
||||
# Make connections
|
||||
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
|
||||
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
|
||||
self.step_size.valueChanged.connect(
|
||||
lambda x: self.motor_control_2d.setProperty("step_size", x)
|
||||
)
|
||||
self.submit_button.clicked.connect(self.submit)
|
||||
|
||||
def _create_separator(self):
|
||||
@@ -248,12 +264,14 @@ class XRayEye(BECWidget, QWidget):
|
||||
################################################################################
|
||||
|
||||
def connect_motors(self):
|
||||
""" Checks one of the possible motors for flomni, omny and lamni setup."""
|
||||
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
|
||||
"""Checks one of the possible motors for flomni, omny and lamni setup."""
|
||||
possible_motors = ["osamroy", "lsamrot", "fsamroy"]
|
||||
|
||||
for motor in possible_motors:
|
||||
if motor in self.dev:
|
||||
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
|
||||
self.bec_dispatcher.connect_slot(
|
||||
self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor)
|
||||
)
|
||||
logger.info(f"Succesfully connected to {motor}")
|
||||
|
||||
################################################################################
|
||||
@@ -341,7 +359,7 @@ class XRayEye(BECWidget, QWidget):
|
||||
|
||||
@SafeSlot(bool, bool)
|
||||
def on_tomo_angle_readback(self, data: dict, meta: dict):
|
||||
#TODO implement if needed
|
||||
# TODO implement if needed
|
||||
print(f"data: {data}")
|
||||
print(f"meta: {meta}")
|
||||
|
||||
@@ -355,25 +373,25 @@ class XRayEye(BECWidget, QWidget):
|
||||
meta(dict): metadata from device
|
||||
"""
|
||||
|
||||
signals = data.get('signals')
|
||||
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
|
||||
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
|
||||
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
|
||||
signals = data.get("signals")
|
||||
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get("value")
|
||||
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get("value")
|
||||
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get("value")
|
||||
self.on_live_view_enabled(bool(enable_live_preview))
|
||||
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
|
||||
|
||||
# Signals from epics gui device
|
||||
# send message
|
||||
user_message = signals.get("omny_xray_gui_send_message").get('value')
|
||||
user_message = signals.get("omny_xray_gui_send_message").get("value")
|
||||
self.user_message = user_message
|
||||
# sample name
|
||||
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
|
||||
sample_message = signals.get("omny_xray_gui_sample_name").get("value")
|
||||
self.sample_name = sample_message
|
||||
# enable frame acquisition
|
||||
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
|
||||
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get("value")
|
||||
self.on_live_view_enabled(bool(update_frame_acq))
|
||||
# enable submit button
|
||||
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
|
||||
enable_submit_button = signals.get("omny_xray_gui_submit").get("value")
|
||||
self.enable_submit_button(enable_submit_button)
|
||||
|
||||
@SafeSlot()
|
||||
@@ -383,35 +401,40 @@ class XRayEye(BECWidget, QWidget):
|
||||
logger.warning("No active ROI")
|
||||
return
|
||||
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
|
||||
roi_center_x = roi_coordinates['center_x']
|
||||
roi_center_y = roi_coordinates['center_y']
|
||||
roi_center_x = roi_coordinates["center_x"]
|
||||
roi_center_y = roi_coordinates["center_y"]
|
||||
# Case of rectangular ROI
|
||||
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
|
||||
roi_width = roi_coordinates['width']
|
||||
roi_height = roi_coordinates['height']
|
||||
roi_width = roi_coordinates["width"]
|
||||
roi_height = roi_coordinates["height"]
|
||||
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
|
||||
roi_width = roi_coordinates['diameter']
|
||||
roi_height = roi_coordinates['radius']
|
||||
roi_width = roi_coordinates["diameter"]
|
||||
roi_height = roi_coordinates["radius"]
|
||||
else:
|
||||
logger.warning("Unsupported ROI type for submit action.")
|
||||
return
|
||||
|
||||
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
|
||||
print(
|
||||
f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}"
|
||||
) # TODO remove when will be not needed for debugging
|
||||
# submit roi coordinates
|
||||
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
|
||||
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get("value"))
|
||||
|
||||
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
|
||||
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
|
||||
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
|
||||
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
|
||||
self.dev.omny_xray_gui.submit.set(1)
|
||||
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
|
||||
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
|
||||
getattr(self.dev,CAMERA[0]).live_mode = False
|
||||
self.bec_dispatcher.disconnect_slot(
|
||||
self.device_updates, MessageEndpoints.device_readback("omny_xray_gui")
|
||||
)
|
||||
getattr(self.dev, CAMERA[0]).live_mode = False
|
||||
super().cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
|
||||
@@ -9,27 +9,27 @@ eiger_1_5:
|
||||
readoutPriority: async
|
||||
softwareTrigger: False
|
||||
|
||||
eiger_9:
|
||||
description: Eiger 9M detector
|
||||
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
|
||||
deviceConfig:
|
||||
detector_distance: 100
|
||||
beam_center: [0, 0]
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: False
|
||||
# eiger_9:
|
||||
# description: Eiger 9M detector
|
||||
# deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
|
||||
# deviceConfig:
|
||||
# detector_distance: 100
|
||||
# beam_center: [0, 0]
|
||||
# onFailure: raise
|
||||
# enabled: true
|
||||
# readoutPriority: async
|
||||
# softwareTrigger: False
|
||||
|
||||
ids_cam:
|
||||
description: IDS camera for live image acquisition
|
||||
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
|
||||
deviceConfig:
|
||||
camera_id: 201
|
||||
bits_per_pixel: 24
|
||||
m_n_colormode: 1
|
||||
live_mode: True
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: True
|
||||
# ids_cam:
|
||||
# description: IDS camera for live image acquisition
|
||||
# deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
|
||||
# deviceConfig:
|
||||
# camera_id: 201
|
||||
# bits_per_pixel: 24
|
||||
# m_n_colormode: 1
|
||||
# live_mode: True
|
||||
# onFailure: raise
|
||||
# enabled: true
|
||||
# readoutPriority: async
|
||||
# softwareTrigger: True
|
||||
|
||||
|
||||
25
csaxs_bec/device_configs/bl_general.yaml
Normal file
25
csaxs_bec/device_configs/bl_general.yaml
Normal file
@@ -0,0 +1,25 @@
|
||||
############################################################
|
||||
##################### EPS ##################################
|
||||
############################################################
|
||||
x12saEPS:
|
||||
description: X12SA EPS info and control
|
||||
deviceClass: csaxs_bec.devices.epics.eps.EPS
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
############################################################
|
||||
##################### GalilRIO #############################
|
||||
############################################################
|
||||
|
||||
galilrioesxbox:
|
||||
description: Galil RIO for remote gain switching and slow reading ES XBox
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesxbox.psi.ch
|
||||
enabled: true
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
@@ -1,11 +1,11 @@
|
||||
# This is the main configuration file that is
|
||||
# commented or uncommented according to the type of experiment
|
||||
|
||||
optics:
|
||||
- !include ./bl_optics_hutch.yaml
|
||||
# optics:
|
||||
# - !include ./bl_optics_hutch.yaml
|
||||
|
||||
frontend:
|
||||
- !include ./bl_frontend.yaml
|
||||
# frontend:
|
||||
# - !include ./bl_frontend.yaml
|
||||
|
||||
endstation:
|
||||
- !include ./bl_endstation.yaml
|
||||
@@ -16,8 +16,8 @@ detectors:
|
||||
#sastt:
|
||||
# - !include ./sastt.yaml
|
||||
|
||||
#flomni:
|
||||
# - !include ./ptycho_flomni.yaml
|
||||
flomni:
|
||||
- !include ./ptycho_flomni.yaml
|
||||
|
||||
#omny:
|
||||
# - !include ./ptycho_omny.yaml
|
||||
|
||||
@@ -471,7 +471,7 @@ omnyfsh:
|
||||
#################### GUI Signals ###########################
|
||||
############################################################
|
||||
omny_xray_gui:
|
||||
description: Gui Epics signals
|
||||
description: Gui signals
|
||||
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
@@ -486,4 +486,25 @@ calculated_signal:
|
||||
compute_method: "def just_rand():\n return 42"
|
||||
enabled: true
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
readoutPriority: baseline
|
||||
|
||||
############################################################
|
||||
#################### OMNY Pandabox #########################
|
||||
############################################################
|
||||
omny_panda:
|
||||
readoutPriority: async
|
||||
deviceClass: csaxs_bec.devices.panda_box.panda_box_omny.PandaBoxOMNY
|
||||
deviceConfig:
|
||||
host: omny-panda.psi.ch
|
||||
signal_alias:
|
||||
FMC_IN.VAL1.Min: cap_voltage_fzp_y_min
|
||||
FMC_IN.VAL1.Max: cap_voltage_fzp_y_max
|
||||
FMC_IN.VAL1.Mean: cap_voltage_fzp_y_mean
|
||||
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
|
||||
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
|
||||
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
|
||||
deviceTags:
|
||||
- detector
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
@@ -309,7 +309,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
# Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay)
|
||||
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
|
||||
|
||||
self.set_delay_pairs(channel="gh", delay=self._shutter_to_open_delay, width=(shutter_width-self._shutter_to_open_delay))
|
||||
self.set_delay_pairs(
|
||||
channel="gh",
|
||||
delay=self._shutter_to_open_delay,
|
||||
width=(shutter_width - self._shutter_to_open_delay),
|
||||
)
|
||||
|
||||
# Trigger extra pulse for MCS OR gate
|
||||
# f = e + 1us
|
||||
@@ -520,7 +524,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
- Return the status object to BEC which will automatically resolve once the status register has
|
||||
the END_OF_BURST bit set. The callback of the status object will also stop the polling loop.
|
||||
"""
|
||||
overall_start = time.time()
|
||||
self._stop_polling()
|
||||
|
||||
# NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source
|
||||
@@ -559,7 +562,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
# Send trigger
|
||||
self.trigger_shot.put(1, use_complete=True)
|
||||
self.cancel_on_stop(status)
|
||||
logger.info(f"Configured ddg in {time.time()-overall_start}")
|
||||
return status
|
||||
|
||||
def on_stop(self) -> None:
|
||||
|
||||
@@ -261,7 +261,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
|
||||
"""
|
||||
with self._rlock:
|
||||
logger.info(f"Received update on mcs card {self.name}")
|
||||
if self._omit_mca_callbacks.is_set():
|
||||
return # Suppress callbacks when erasing all channels
|
||||
self._mca_counter_index += 1
|
||||
@@ -293,9 +292,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
)
|
||||
|
||||
# Once we have received all channels, push data to BEC and reset for next accumulation
|
||||
logger.info(
|
||||
f"Received update for {attr_name}, index {self._mca_counter_index}/{self.NUM_MCA_CHANNELS}"
|
||||
)
|
||||
if len(self._current_data) == self.NUM_MCA_CHANNELS:
|
||||
logger.debug(
|
||||
f"Current data index {self._current_data_index} complete, pushing to BEC."
|
||||
@@ -396,13 +392,15 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
self._current_data_index = 0
|
||||
|
||||
# NOTE Make sure that the signal that omits mca callbacks is cleared
|
||||
# DO NOT REMOVE!!
|
||||
self._omit_mca_callbacks.clear()
|
||||
|
||||
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
|
||||
# For a fly scan we need to start the mcs card ourselves
|
||||
if self.scan_info.msg.scan_type == "fly":
|
||||
self.erase_start.put(1)
|
||||
|
||||
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
|
||||
|
||||
def on_prescan(self) -> None | StatusBase:
|
||||
"""
|
||||
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure
|
||||
@@ -446,7 +444,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
while not self._scan_done_thread_kill_event.is_set():
|
||||
while self._start_monitor_async_data_emission.wait():
|
||||
try:
|
||||
logger.debug(f"Monitoring async data emission for {self.name}...")
|
||||
if (
|
||||
hasattr(self.scan_info.msg, "num_points")
|
||||
and self.scan_info.msg.num_points is not None
|
||||
@@ -456,7 +453,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
for callback in self._scan_done_callbacks:
|
||||
callback(exception=None)
|
||||
else:
|
||||
logger.info(f"Current data index is {self._current_data_index}")
|
||||
if self._current_data_index >= 1:
|
||||
for callback in self._scan_done_callbacks:
|
||||
callback(exception=None)
|
||||
@@ -552,8 +548,9 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
|
||||
self.stop_all.put(1)
|
||||
self.erase_all.put(1)
|
||||
with suppress_mca_callbacks(self):
|
||||
self.stop_all.put(1)
|
||||
self.erase_all.put(1)
|
||||
|
||||
def mcs_recovery(self, timeout: int = 1) -> None:
|
||||
"""
|
||||
|
||||
@@ -132,7 +132,6 @@ class Eiger(PSIDeviceBase):
|
||||
if data is None:
|
||||
logger.error(f"Received image message on device {self.name} without data.")
|
||||
return
|
||||
logger.info(f"Received preview image on device {self.name}")
|
||||
self.preview_image.put(data)
|
||||
|
||||
# pylint: disable=missing-function-docstring
|
||||
|
||||
@@ -13,6 +13,7 @@ which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS varia
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
@@ -78,12 +79,38 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
|
||||
"""
|
||||
|
||||
_NUM_ANALOG_CHANNELS = 8
|
||||
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
|
||||
|
||||
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
signal_name: str,
|
||||
channel: int,
|
||||
parent: GalilRIO,
|
||||
readback_timeout: float = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
|
||||
self._channel = channel
|
||||
self._metadata["connected"] = False
|
||||
self._readback_timeout = (
|
||||
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
|
||||
)
|
||||
self._metadata["write_access"] = False
|
||||
self._last_readback = 0.0
|
||||
|
||||
def get(self):
|
||||
current_time = time.monotonic()
|
||||
if current_time - self._last_readback > self._readback_timeout:
|
||||
old_value = self._readback
|
||||
self._last_readback = current_time # _socket_get may rely on this value to be set.
|
||||
self._readback = self._socket_get()
|
||||
self._run_subs(
|
||||
sub_type=self.SUB_VALUE,
|
||||
old_value=old_value,
|
||||
value=self._readback,
|
||||
timestamp=current_time,
|
||||
)
|
||||
return self._readback
|
||||
|
||||
def _socket_set(self, val):
|
||||
"""Read-only signal, so set method raises an error."""
|
||||
@@ -136,6 +163,8 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
|
||||
|
||||
# Run subscriptions after all readbacks have been updated
|
||||
# on all channels except the one that triggered the update
|
||||
# TODO for now skip running subscribers, this should be re-implemented
|
||||
# once we properly handle subscriptions from bec running "read"
|
||||
for walk in self.parent.walk_signals():
|
||||
if walk.item.attr_name in updates:
|
||||
new_val, old_val = updates[walk.item.attr_name]
|
||||
@@ -185,7 +214,7 @@ def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
|
||||
an_channels[f"ch{i}"] = (
|
||||
GalilRIOAnalogSignalRO,
|
||||
f"ch{i}",
|
||||
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."},
|
||||
{"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
|
||||
)
|
||||
return an_channels
|
||||
|
||||
@@ -202,12 +231,7 @@ def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
|
||||
di_out_channels[f"ch{i}"] = (
|
||||
GalilRIODigitalOutSignal,
|
||||
f"ch{i}",
|
||||
{
|
||||
"kind": Kind.config,
|
||||
"notify_bec": True,
|
||||
"channel": i,
|
||||
"doc": f"Digital output channel {i}.",
|
||||
},
|
||||
{"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
|
||||
)
|
||||
return di_out_channels
|
||||
|
||||
|
||||
@@ -65,10 +65,8 @@ class RtLamniController(Controller):
|
||||
"_position_sampling_single_read",
|
||||
"_position_sampling_single_reset_and_start_sampling",
|
||||
"show_signal_strength_interferometer",
|
||||
"show_interferometer_positions",
|
||||
"show_analog_signals",
|
||||
"show_feedback_status",
|
||||
|
||||
]
|
||||
|
||||
def __init__(
|
||||
|
||||
103
csaxs_bec/devices/panda_box/panda_box.py
Normal file
103
csaxs_bec/devices/panda_box/panda_box.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Module to integrate the PandaBox for cSAXS measurements."""
|
||||
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd_devices import StatusBase
|
||||
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class PandaBoxCSAXS(PandaBox):
|
||||
"""
|
||||
PandaBox integration for cSAXS. This class implements cSAXS specific logic for the PandaBox integration.
|
||||
|
||||
TODO: This logic is not yet mapped to any existing hardware. Adapt Docstring once the hardware is defined and integrated.
|
||||
"""
|
||||
|
||||
def on_init(self):
|
||||
super().on_init()
|
||||
self._acquisition_group = "burst"
|
||||
self._timeout_on_completed = 10
|
||||
|
||||
def on_stage(self):
|
||||
start_time = time.time()
|
||||
super().on_stage()
|
||||
# TODO, adjust as seen fit.
|
||||
# Adjust the acquisition group based on scan parameters if needed
|
||||
if self.scan_info.msg.scan_type == "fly":
|
||||
self._acquisition_group = "fly"
|
||||
elif self.scan_info.msg.scan_type == "step":
|
||||
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
|
||||
self._acquisition_group = "monitored"
|
||||
else:
|
||||
self._acquisition_group = "burst"
|
||||
|
||||
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
|
||||
|
||||
def on_complete(self):
|
||||
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
|
||||
|
||||
def _check_capture_complete():
|
||||
captured = 0
|
||||
start_time = time.monotonic()
|
||||
try:
|
||||
expected_points = int(
|
||||
self.scan_info.msg.num_points
|
||||
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
)
|
||||
while captured < expected_points:
|
||||
ret = self.send_raw("*PCAP.CAPTURED?")
|
||||
captured = int(ret[0].split("=")[-1])
|
||||
time.sleep(0.01)
|
||||
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
|
||||
logger.info(
|
||||
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
|
||||
)
|
||||
if (time.monotonic() - start_time) > self._timeout_on_completed:
|
||||
raise TimeoutError(
|
||||
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
|
||||
)
|
||||
finally:
|
||||
self._disarm()
|
||||
|
||||
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
|
||||
self.cancel_on_stop(status_captured)
|
||||
return status_captured
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
|
||||
panda = PandaBoxCSAXS(
|
||||
name="omny_panda",
|
||||
host="omny-panda.psi.ch",
|
||||
signal_alias={
|
||||
"FMC_IN.VAL2.Value": "alias",
|
||||
"FMC_IN.VAL1.Min": "alias2",
|
||||
"FMC_IN.VAL1.Max": "alias3",
|
||||
"FMC_IN.VAL1.Mean": "alias4",
|
||||
},
|
||||
)
|
||||
panda.on_connected()
|
||||
status = StatusBase(obj=panda)
|
||||
panda.add_status_callback(
|
||||
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
|
||||
)
|
||||
panda.stop()
|
||||
status.wait(timeout=2)
|
||||
panda.unstage()
|
||||
logger.info(f"Panda connected")
|
||||
ret = panda.stage()
|
||||
logger.info(f"Panda staged")
|
||||
ret = panda.pre_scan()
|
||||
ret.wait(timeout=5)
|
||||
logger.info(f"Panda pre scan done")
|
||||
time.sleep(5)
|
||||
panda.stop()
|
||||
st = panda.complete()
|
||||
st.wait(timeout=5)
|
||||
logger.info(f"Measurement completed")
|
||||
panda.unstage()
|
||||
logger.info(f"Panda Unstaged")
|
||||
99
csaxs_bec/devices/panda_box/panda_box_omny.py
Normal file
99
csaxs_bec/devices/panda_box/panda_box_omny.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Module to integrate the PandaBox for cSAXS measurements."""
|
||||
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd_devices import StatusBase
|
||||
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class PandaBoxOMNY(PandaBox):
|
||||
"""PandaBox integration for OMNY. This class implements OMNY specific logic for the PandaBox integration."""
|
||||
|
||||
def on_init(self):
|
||||
super().on_init()
|
||||
self._acquisition_group = "burst"
|
||||
self._timeout_on_completed = 10
|
||||
|
||||
def on_stage(self):
|
||||
start_time = time.time()
|
||||
super().on_stage()
|
||||
# TODO, adjust as seen fit.
|
||||
# Adjust the acquisition group based on scan parameters if needed
|
||||
if self.scan_info.msg.scan_type == "fly":
|
||||
self._acquisition_group = "fly"
|
||||
elif self.scan_info.msg.scan_type == "step":
|
||||
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
|
||||
self._acquisition_group = "monitored"
|
||||
else:
|
||||
self._acquisition_group = "burst"
|
||||
|
||||
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
|
||||
|
||||
def on_complete(self):
|
||||
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
|
||||
|
||||
def _check_capture_complete():
|
||||
captured = 0
|
||||
start_time = time.monotonic()
|
||||
try:
|
||||
expected_points = int(
|
||||
self.scan_info.msg.num_points
|
||||
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
|
||||
)
|
||||
while captured < expected_points:
|
||||
ret = self.send_raw("*PCAP.CAPTURED?")
|
||||
captured = int(ret[0].split("=")[-1])
|
||||
time.sleep(0.01)
|
||||
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
|
||||
logger.info(
|
||||
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
|
||||
)
|
||||
if (time.monotonic() - start_time) > self._timeout_on_completed:
|
||||
raise TimeoutError(
|
||||
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
|
||||
)
|
||||
finally:
|
||||
self._disarm()
|
||||
|
||||
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
|
||||
self.cancel_on_stop(status_captured)
|
||||
return status_captured
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
|
||||
panda = PandaBoxOMNY(
|
||||
name="omny_panda",
|
||||
host="omny-panda.psi.ch",
|
||||
signal_alias={
|
||||
"FMC_IN.VAL2.Value": "alias",
|
||||
"FMC_IN.VAL1.Min": "alias2",
|
||||
"FMC_IN.VAL1.Max": "alias3",
|
||||
"FMC_IN.VAL1.Mean": "alias4",
|
||||
},
|
||||
)
|
||||
panda.on_connected()
|
||||
status = StatusBase(obj=panda)
|
||||
panda.add_status_callback(
|
||||
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
|
||||
)
|
||||
panda.stop()
|
||||
status.wait(timeout=2)
|
||||
panda.unstage()
|
||||
logger.info(f"Panda connected")
|
||||
ret = panda.stage()
|
||||
logger.info(f"Panda staged")
|
||||
ret = panda.pre_scan()
|
||||
ret.wait(timeout=5)
|
||||
logger.info(f"Panda pre scan done")
|
||||
time.sleep(5)
|
||||
panda.stop()
|
||||
st = panda.complete()
|
||||
st.wait(timeout=5)
|
||||
logger.info(f"Measurement completed")
|
||||
panda.unstage()
|
||||
logger.info(f"Panda Unstaged")
|
||||
@@ -210,7 +210,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
arg_input = {}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs):
|
||||
"""
|
||||
A LamNI scan following Fermat's spiral.
|
||||
|
||||
@@ -230,10 +230,10 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
|
||||
Examples:
|
||||
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
|
||||
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1)
|
||||
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1)
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs)
|
||||
self.axis = []
|
||||
scan_kwargs = parameter.get("kwargs", {})
|
||||
self.fov_size = scan_kwargs.get("fov_size")
|
||||
@@ -482,6 +482,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
yield from self.open_scan()
|
||||
yield from self.stage()
|
||||
yield from self.run_baseline_reading()
|
||||
yield from self.pre_scan()
|
||||
yield from self.scan_core()
|
||||
yield from self.finalize()
|
||||
yield from self.unstage()
|
||||
|
||||
@@ -52,6 +52,7 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
angle: float = None,
|
||||
corridor_size: float = 3,
|
||||
parameter: dict = None,
|
||||
frames_per_trigger:int=1,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
@@ -62,7 +63,8 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um
|
||||
cenx(float) [um]: center position in x.
|
||||
ceny(float) [um]: center position in y.
|
||||
exp_time(float) [s]: exposure time
|
||||
exp_time(float) [s]: exposure time per burst frame
|
||||
frames_per_trigger(int) : Number of burst frames per point
|
||||
step(float) [um]: stepsize
|
||||
zshift(float) [um]: shift in z
|
||||
angle(float) [deg]: rotation angle (will rotate first)
|
||||
@@ -71,10 +73,10 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
Returns:
|
||||
|
||||
Examples:
|
||||
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)
|
||||
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, exp_time=exp_time, **kwargs)
|
||||
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
|
||||
self.show_live_table = False
|
||||
self.axis = []
|
||||
self.fovx = fovx
|
||||
@@ -323,6 +325,7 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
yield from self.stage()
|
||||
yield from self.run_baseline_reading()
|
||||
yield from self._prepare_setup_part2()
|
||||
yield from self.pre_scan()
|
||||
yield from self.scan_core()
|
||||
yield from self.finalize()
|
||||
yield from self.unstage()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
""" Module with JungfrauJochTestScan class. """
|
||||
"""Module with JungfrauJochTestScan class."""
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion
|
||||
|
||||
@@ -51,6 +51,7 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
angle: float = None,
|
||||
corridor_size: float = 3,
|
||||
parameter: dict = None,
|
||||
frames_per_trigger:int=1,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
@@ -62,6 +63,7 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
cenx(float) [um]: center position in x.
|
||||
ceny(float) [um]: center position in y.
|
||||
exp_time(float) [s]: exposure time
|
||||
frames_per_trigger:int: Number of burst frames per trigger, defaults to 1.
|
||||
step(float) [um]: stepsize
|
||||
zshift(float) [um]: shift in z
|
||||
angle(float) [deg]: rotation angle (will rotate first)
|
||||
@@ -73,7 +75,7 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01)
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
|
||||
self.axis = []
|
||||
self.fovx = fovx
|
||||
self.fovy = fovy
|
||||
@@ -299,6 +301,7 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
yield from self.stage()
|
||||
yield from self.run_baseline_reading()
|
||||
yield from self._prepare_setup_part2()
|
||||
yield from self.pre_scan()
|
||||
yield from self.scan_core()
|
||||
yield from self.finalize()
|
||||
yield from self.unstage()
|
||||
|
||||
@@ -193,14 +193,15 @@ The basic scan function can be called by `scans.flomni_fermat_scan()` and offers
|
||||
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
|
||||
| cenx (float) | center position in x |
|
||||
| ceny (float) | center position in y |
|
||||
| exp_time (float) | exposure time |
|
||||
| exp_time (float) | exposure time per frame |
|
||||
| frames_per_trigger(int) | Number of burst frames per position |
|
||||
| step (float) | stepsize |
|
||||
| zshift (float) | shift in z |
|
||||
| angle (float) | rotation angle (will rotate first) |
|
||||
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
|
||||
|
||||
Example:
|
||||
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
|
||||
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
|
||||
|
||||
#### Overview of the alignment steps
|
||||
|
||||
|
||||
@@ -327,14 +327,15 @@ The basic scan function can be called by `scans.omny_fermat_scan()` and offers a
|
||||
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
|
||||
| cenx (float) | center position in x |
|
||||
| ceny (float) | center position in y |
|
||||
| exp_time (float) | exposure time |
|
||||
| exp_time (float) | exposure time per frame |
|
||||
| frames_per_trigger(int) | Number of burst frames per position |
|
||||
| step (float) | stepsize |
|
||||
| zshift (float) | shift in z |
|
||||
| angle (float) | rotation angle (will rotate first) |
|
||||
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
|
||||
|
||||
Example:
|
||||
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)`
|
||||
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
|
||||
|
||||
#### Overview of the alignment steps
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
|
||||
name = "csaxs_bec"
|
||||
version = "0.0.0"
|
||||
description = "The cSAXS plugin repository for BEC"
|
||||
requires-python = ">=3.10"
|
||||
requires-python = ">=3.11"
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Programming Language :: Python :: 3",
|
||||
|
||||
@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
|
||||
lamni = LamNI(client)
|
||||
align = XrayEyeAlign(client, lamni)
|
||||
with mock.patch(
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
"csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
) as epics_put_mock:
|
||||
align.save_frame()
|
||||
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
|
||||
def test_update_frame(bec_client_mock):
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen"
|
||||
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
|
||||
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
|
||||
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
|
||||
client = bec_client_mock
|
||||
client.device_manager.devices.xeye = DeviceBase(
|
||||
name="xeye",
|
||||
|
||||
@@ -217,6 +217,16 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
|
||||
assert not mcs._start_monitor_async_data_emission.is_set()
|
||||
|
||||
|
||||
def test_mcs_on_stop(mock_mcs_csaxs: MCSCardCSAXS):
|
||||
"""Test that on stop sets the omit_mca_callbacks flag. Also test that on stage clears the omit_mca_callbacks flag."""
|
||||
mcs = mock_mcs_csaxs
|
||||
assert mcs._omit_mca_callbacks.is_set() is False
|
||||
mcs.stop()
|
||||
assert mcs._omit_mca_callbacks.is_set() is True
|
||||
mcs.stage()
|
||||
assert mcs._omit_mca_callbacks.is_set() is False
|
||||
|
||||
|
||||
def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS):
|
||||
mcs = mock_mcs_csaxs
|
||||
# Simulate ongoing acquisition
|
||||
|
||||
190
tests/tests_devices/test_panda.py
Normal file
190
tests/tests_devices/test_panda.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""Module for testing the PandaBoxCSAXS and PandaBoxOMNY devices."""
|
||||
|
||||
# pylint: skip-file
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from ophyd import Staged
|
||||
|
||||
from csaxs_bec.devices.panda_box.panda_box import PandaBoxCSAXS
|
||||
from csaxs_bec.devices.panda_box.panda_box_omny import PandaBoxOMNY
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def panda_omny():
|
||||
dev_name = "panda_omny"
|
||||
dev = PandaBoxOMNY(
|
||||
name=dev_name,
|
||||
host="omny-panda-box.psi.ch",
|
||||
signal_alias={
|
||||
"FMC_IN.VAL1.Min": "cap_voltage_fzp_y_min",
|
||||
"FMC_IN.VAL1.Max": "cap_voltage_fzp_y_max",
|
||||
"FMC_IN.VAL1.Mean": "cap_voltage_fzp_y_mean",
|
||||
"FMC_IN.VAL2.Min": "cap_voltage_fzp_x_min",
|
||||
"FMC_IN.VAL2.Max": "cap_voltage_fzp_x_max",
|
||||
"FMC_IN.VAL2.Mean": "cap_voltage_fzp_x_mean",
|
||||
},
|
||||
)
|
||||
yield dev
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def panda_csaxs():
|
||||
dev_name = "panda_csaxs"
|
||||
dev = PandaBoxCSAXS(name=dev_name, host="csaxs-panda-box.psi.ch")
|
||||
yield dev
|
||||
|
||||
|
||||
def test_panda_omny(panda_omny):
|
||||
assert panda_omny.name == "panda_omny"
|
||||
assert panda_omny.host == "omny-panda-box.psi.ch"
|
||||
|
||||
all_signal_names = [name for name, _ in panda_omny.data.signals]
|
||||
# Check that the signal aliases are correctly set up
|
||||
assert "cap_voltage_fzp_y_min" in all_signal_names
|
||||
assert "cap_voltage_fzp_y_max" in all_signal_names
|
||||
assert "cap_voltage_fzp_y_mean" in all_signal_names
|
||||
assert "cap_voltage_fzp_x_min" in all_signal_names
|
||||
assert "cap_voltage_fzp_x_max" in all_signal_names
|
||||
assert "cap_voltage_fzp_x_mean" in all_signal_names
|
||||
|
||||
# Check that the original signal names are not present
|
||||
assert "FMC_IN.VAL1.Min" not in all_signal_names
|
||||
assert "FMC_IN.VAL1.Max" not in all_signal_names
|
||||
assert "FMC_IN.VAL1.Mean" not in all_signal_names
|
||||
assert "FMC_IN.VAL2.Min" not in all_signal_names
|
||||
assert "FMC_IN.VAL2.Max" not in all_signal_names
|
||||
assert "FMC_IN.VAL2.Mean" not in all_signal_names
|
||||
|
||||
assert panda_omny._acquisition_group == "burst"
|
||||
assert panda_omny._timeout_on_completed == 10
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scan_type, frames_per_trigger, expected_acquisition_group",
|
||||
[
|
||||
("fly", 1, "fly"),
|
||||
("fly", 5, "fly"),
|
||||
("step", 10, "burst"),
|
||||
("step", 1, "monitored"), # Default case
|
||||
],
|
||||
)
|
||||
def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_acquisition_group):
|
||||
# Check that the stage signal is present and has the correct PV
|
||||
assert len(panda_omny._status_callbacks) == 0
|
||||
|
||||
panda_omny.scan_info.msg.scan_type = scan_type
|
||||
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
panda_omny.stage()
|
||||
|
||||
assert panda_omny._acquisition_group == expected_acquisition_group
|
||||
assert panda_omny.staged == Staged.yes
|
||||
|
||||
|
||||
def test_panda_omny_complete(panda_omny):
|
||||
"""Test the on_complete method of the PandaBoxCSAXS device."""
|
||||
panda_omny.scan_info.msg.num_points = 1
|
||||
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
|
||||
|
||||
panda_omny._timeout_on_completed = 0.5 # Set a short timeout for testing
|
||||
|
||||
def _mock_return_captured(*args, **kwargs):
|
||||
return ["=0"]
|
||||
|
||||
# Timeout Error on complete
|
||||
with (
|
||||
mock.patch.object(panda_omny, "send_raw", side_effect=_mock_return_captured),
|
||||
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
|
||||
):
|
||||
status = panda_omny.on_complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
|
||||
with pytest.raises(TimeoutError):
|
||||
status.wait(timeout=4)
|
||||
mock_disarm.assert_called_once()
|
||||
|
||||
# Successful complete
|
||||
panda_omny._timeout_on_completed = 5
|
||||
with (
|
||||
mock.patch.object(panda_omny, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
|
||||
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
|
||||
):
|
||||
status = panda_omny.on_complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
|
||||
status.wait(timeout=4)
|
||||
mock_disarm.assert_called_once()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
|
||||
|
||||
def test_panda_csaxs(panda_csaxs):
|
||||
assert panda_csaxs.name == "panda_csaxs"
|
||||
assert panda_csaxs.host == "csaxs-panda-box.psi.ch"
|
||||
|
||||
assert panda_csaxs._acquisition_group == "burst"
|
||||
assert panda_csaxs._timeout_on_completed == 10
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"scan_type, frames_per_trigger, expected_acquisition_group",
|
||||
[
|
||||
("fly", 1, "fly"),
|
||||
("fly", 5, "fly"),
|
||||
("step", 10, "burst"),
|
||||
("step", 1, "monitored"), # Default case
|
||||
],
|
||||
)
|
||||
def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_acquisition_group):
|
||||
"""Test the on_stage method of the PandaBoxCSAXS device for different scan types and frames per trigger."""
|
||||
assert len(panda_csaxs._status_callbacks) == 0
|
||||
|
||||
panda_csaxs.scan_info.msg.scan_type = scan_type
|
||||
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
|
||||
panda_csaxs.stage()
|
||||
|
||||
assert panda_csaxs._acquisition_group == expected_acquisition_group
|
||||
assert panda_csaxs.staged == Staged.yes
|
||||
|
||||
|
||||
def test_panda_csaxs_complete(panda_csaxs):
|
||||
"""Test the on_complete method of the PandaBoxCSAXS device."""
|
||||
panda_csaxs.scan_info.msg.num_points = 1
|
||||
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
|
||||
|
||||
panda_csaxs._timeout_on_completed = 0.5 # Set a short timeout for testing
|
||||
|
||||
def _mock_return_captured(*args, **kwargs):
|
||||
return ["=0"]
|
||||
|
||||
# Timeout Error on complete
|
||||
with (
|
||||
mock.patch.object(panda_csaxs, "send_raw", side_effect=_mock_return_captured),
|
||||
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
|
||||
):
|
||||
status = panda_csaxs.on_complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
|
||||
with pytest.raises(TimeoutError):
|
||||
status.wait(timeout=4)
|
||||
mock_disarm.assert_called_once()
|
||||
|
||||
# Successful complete
|
||||
panda_csaxs._timeout_on_completed = 5
|
||||
with (
|
||||
mock.patch.object(panda_csaxs, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
|
||||
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
|
||||
):
|
||||
status = panda_csaxs.on_complete()
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
|
||||
status.wait(timeout=4)
|
||||
mock_disarm.assert_called_once()
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
@@ -302,6 +302,36 @@ def device_manager_mock():
|
||||
action="set",
|
||||
parameter={"value": 2.1508313829565293},
|
||||
),
|
||||
messages.DeviceInstructionMessage(
|
||||
metadata={
|
||||
"readout_priority": "monitored",
|
||||
"RID": "1234",
|
||||
"device_instr_id": "diid",
|
||||
},
|
||||
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
|
||||
action="pre_scan",
|
||||
parameter={},
|
||||
),
|
||||
messages.DeviceInstructionMessage(
|
||||
metadata={
|
||||
"readout_priority": "monitored",
|
||||
"RID": "1234",
|
||||
"device_instr_id": "diid",
|
||||
},
|
||||
device="rtx",
|
||||
action="set",
|
||||
parameter={"value": 1.3681828686580249},
|
||||
),
|
||||
messages.DeviceInstructionMessage(
|
||||
metadata={
|
||||
"readout_priority": "monitored",
|
||||
"RID": "1234",
|
||||
"device_instr_id": "diid",
|
||||
},
|
||||
device="rty",
|
||||
action="set",
|
||||
parameter={"value": 2.1508313829565293},
|
||||
),
|
||||
None,
|
||||
messages.DeviceInstructionMessage(
|
||||
metadata={
|
||||
|
||||
Reference in New Issue
Block a user