Compare commits

..

23 Commits

Author SHA1 Message Date
x01dc
c1559e4c0b fixed ascii logo flomni and omny
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m54s
2026-03-06 14:35:04 +01:00
x01dc
8013a6305d added beck startup
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m53s
CI for csaxs_bec / test (pull_request) Failing after 40s
2026-03-06 14:32:47 +01:00
x01dc
393b91c2ac now reading encoder values for axes with encoder
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m57s
CI for csaxs_bec / test (pull_request) Failing after 1m56s
2026-03-06 14:31:23 +01:00
0f072a786e test: add tests for panda, fix tests for fermat scan
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-02 13:15:53 +01:00
05a1e3d8be refactor: cleanup docs and logs for most relevant devices 2026-03-02 13:15:53 +01:00
e9fd9084b8 refactor(csaxsdlpca200): cleanup docs 2026-03-02 13:15:53 +01:00
40ef387134 fix(panda): make complete asyncronous for PandaBoxOmny 2026-03-02 13:15:53 +01:00
x12sa
6ed84664f2 added docs for burst acquisition 2026-03-02 13:15:53 +01:00
x12sa
e5e3343da7 post startup script sessions for all setups, and required modifications 2026-03-02 13:15:53 +01:00
x12sa
c8866faccc logic for gain setting and readback for bpm amplifiers 2026-03-02 13:15:53 +01:00
x12sa
3b561c251c fix(config): remove panda test config 2026-03-02 13:15:53 +01:00
x12sa
bc187040ad refactor(panda-box): add pandaboxomny and refactor panda_box main integration 2026-03-02 13:15:53 +01:00
x12sa
efd27a27e8 fix(ferma-scan): fix flomni, lamni and omny fermat scans. add exp_time and frames_per_trigger 2026-03-02 13:15:53 +01:00
x12sa
7096ef3323 refactor(config): Update configs for bl_general and flomni 2026-03-02 13:15:53 +01:00
13378f24dd refactor(panda-box): refactor Pandabox, moving logic to base class 2026-03-02 13:15:53 +01:00
x01dc
f5b898ea1c feat: add panda box csaxs integration 2026-03-01 18:15:57 +01:00
3d62bea04b Update repo with template version v1.2.8
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m55s
CI for csaxs_bec / test (push) Successful in 1m54s
2026-02-27 16:25:22 +01:00
1518845d25 resolve merge conflicts 2026-02-27 16:25:22 +01:00
ff3b6686db Update repo with template version v1.2.7 2026-02-27 16:25:22 +01:00
afdc64e296 fix(rio): fix rio cached readings
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m40s
CI for csaxs_bec / test (push) Successful in 1m36s
2026-02-26 16:15:29 +01:00
bc31c00e1f fix(tests): x_ray_eye_align correct imports fixed after refactor of LamNI
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m36s
CI for csaxs_bec / test (push) Successful in 1m37s
2026-02-23 13:25:09 +01:00
x01dc
38671f074e minor printout fix
Some checks failed
CI for csaxs_bec / test (pull_request) Failing after 1m30s
CI for csaxs_bec / test (push) Failing after 1m32s
2026-02-23 12:44:04 +01:00
x01dc
92e39a5f75 minor adjmustment 2026-02-23 12:35:56 +01:00
43 changed files with 1273 additions and 170 deletions

View File

@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things. # It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates. # This file will be overwritten by copier on template updates.
_commit: v1.2.2 _commit: v1.2.8
_src_path: https://github.com/bec-project/plugin_copier_template.git _src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false make_commit: false
project_name: csaxs_bec project_name: csaxs_bec

View File

@@ -28,7 +28,7 @@ on:
description: "Python version to use" description: "Python version to use"
required: false required: false
type: string type: string
default: "3.11" default: "3.12"
permissions: permissions:
pull-requests: write pull-requests: write
@@ -44,7 +44,19 @@ jobs:
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}" python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core - name: Checkout BEC Core
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
@@ -55,13 +67,6 @@ jobs:
- name: Checkout BEC Widgets - name: Checkout BEC Widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/csaxs_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./csaxs_bec
- name: Install dependencies - name: Install dependencies
shell: bash shell: bash
run: | run: |

View File

@@ -0,0 +1,62 @@
name: Create template upgrade PR for csaxs_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install tools
run: |
pip install copier PySide6
- name: Checkout
uses: actions/checkout@v4
- name: Perform update
run: |
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
output="$(copier update --trust --defaults --conflict inline 2>&1)"
echo "$output"
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"

View File

@@ -1,20 +0,0 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "csaxs"
target: "csaxs_bec"
branch: $CHILD_PIPELINE_BRANCH
pages:
stage: Deploy
needs: []
variables:
TARGET_BRANCH: $CI_COMMIT_REF_NAME
rules:
- if: "$CI_COMMIT_TAG != null"
variables:
TARGET_BRANCH: $CI_COMMIT_TAG
- if: '$CI_COMMIT_REF_NAME == "main" && $CI_PROJECT_PATH == "bec/csaxs_bec"'
script:
- curl -X POST -d "branches=$CI_COMMIT_REF_NAME" -d "token=$RTD_TOKEN" https://readthedocs.org/api/v2/webhook/sls-csaxs/270162/

View File

@@ -210,13 +210,11 @@ class LamNI(LamNIOpticsMixin):
self.feedback_status() self.feedback_status()
def feedback_status(self): def feedback_status(self):
if self.device_manager.devices.rtx.controller.feedback_is_running(): self.device_manager.devices.rtx.controller.show_feedback_status()
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
else:
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
def show_interferometer_positions(self): def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_interferometer_positions() self.device_manager.devices.rtx.controller.show_feedback_status()
def show_signal_strength(self): def show_signal_strength(self):
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer() self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()

View File

@@ -9,9 +9,11 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_put, fshclose
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class LamNIInitError(Exception): class LamNIInitError(Exception):
pass pass

View File

@@ -0,0 +1,442 @@
"""
csaxs_dlpca200.py
=================
BEC control script for FEMTO DLPCA-200 Variable Gain Low Noise Current Amplifiers
connected to Galil RIO digital outputs.
DLPCA-200 Remote Control (datasheet page 4)
-------------------------------------------
Sub-D pin -> function:
Pin 10 -> gain LSB (digital out channel, index 0 in bit-tuple)
Pin 11 -> gain MID (digital out channel, index 1 in bit-tuple)
Pin 12 -> gain MSB (digital out channel, index 2 in bit-tuple)
Pin 13 -> coupling LOW = AC, HIGH = DC
Pin 14 -> speed mode HIGH = low noise (Pin14=1), LOW = high speed (Pin14=0)
Gain truth table (MSB, MID, LSB):
0,0,0 -> low-noise: 1e3 high-speed: 1e5
0,0,1 -> low-noise: 1e4 high-speed: 1e6
0,1,0 -> low-noise: 1e5 high-speed: 1e7
0,1,1 -> low-noise: 1e6 high-speed: 1e8
1,0,0 -> low-noise: 1e7 high-speed: 1e9
1,0,1 -> low-noise: 1e8 high-speed: 1e10
1,1,0 -> low-noise: 1e9 high-speed: 1e11
Strategy: prefer low-noise mode (1e3-1e9). For 1e10 and 1e11,
automatically fall back to high-speed mode.
Device wiring example (galilrioesxbox):
bpm4: Pin10->ch0, Pin11->ch1, Pin12->ch2, Pin13->ch3, Pin14->ch4
bim: Pin10->ch6, Pin11->ch7, Pin12->ch8, Pin13->ch9, Pin14->ch10
Usage examples
--------------
csaxs_amp = cSAXSDLPCA200(client)
csaxs_amp.set_gain("bpm4", 1e7) # low-noise if possible
csaxs_amp.set_gain("bim", 1e10) # auto high-speed
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
csaxs_amp.info("bpm4") # print current settings
csaxs_amp.info_all() # print all configured amplifiers
"""
import builtins
from bec_lib import bec_logger
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
# ---------------------------------------------------------------------------
# Amplifier registry
# ---------------------------------------------------------------------------
# Each entry describes one DLPCA-200 amplifier connected to a Galil RIO.
#
# Keys inside "channels":
# gain_lsb -> digital output channel number wired to DLPCA-200 Pin 10
# gain_mid -> digital output channel number wired to DLPCA-200 Pin 11
# gain_msb -> digital output channel number wired to DLPCA-200 Pin 12
# coupling -> digital output channel number wired to DLPCA-200 Pin 13
# speed_mode -> digital output channel number wired to DLPCA-200 Pin 14
#
# To add a new amplifier, simply extend this dict.
# ---------------------------------------------------------------------------
DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"bpm4": {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3
"speed_mode": 4, # Pin 14 -> Galil ch4
},
},
"bim": {
"rio_device": "galilrioesxbox",
"description": "Beam Intensity Monitor current amplifier",
"channels": {
"gain_lsb": 6, # Pin 10 -> Galil ch6
"gain_mid": 7, # Pin 11 -> Galil ch7
"gain_msb": 8, # Pin 12 -> Galil ch8
"coupling": 9, # Pin 13 -> Galil ch9
"speed_mode": 10, # Pin 14 -> Galil ch10
},
},
}
# ---------------------------------------------------------------------------
# DLPCA-200 gain encoding tables
# ---------------------------------------------------------------------------
# (msb, mid, lsb) -> gain in V/A
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
# Inverse maps: gain -> (msb, mid, lsb, low_noise_flag)
# low_noise_flag: True = Pin14 HIGH, False = Pin14 LOW
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class cSAXSDLPCA200Error(Exception):
pass
class cSAXSDLPCA200:
"""
Control class for FEMTO DLPCA-200 current amplifiers connected via Galil RIO
digital outputs in a BEC environment.
Supports:
- Forward control: set_gain(), set_coupling()
- Readback reporting: info(), info_all(), read_settings()
- Robust error handling and logging following cSAXS conventions.
"""
TAG = "[DLPCA200]"
def __init__(self, client, config: dict | None = None) -> None:
"""
Parameters
----------
client : BEC client object (passed through for future use)
config : optional override for DLPCA200_AMPLIFIER_CONFIG.
Falls back to the module-level dict if not provided.
"""
self.client = client
self._config: dict[str, dict] = config if config is not None else DLPCA200_AMPLIFIER_CONFIG
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _require_dev(self) -> None:
if dev is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} BEC 'dev' namespace is not available in this session."
)
def _get_cfg(self, amp_name: str) -> dict:
"""Return config dict for a named amplifier, raising on unknown names."""
if amp_name not in self._config:
known = ", ".join(sorted(self._config.keys()))
raise cSAXSDLPCA200Error(f"{self.TAG} Unknown amplifier '{amp_name}'. Known: [{known}]")
return self._config[amp_name]
def _get_rio(self, amp_name: str):
"""Return the live RIO device object for a given amplifier."""
self._require_dev()
cfg = self._get_cfg(amp_name)
rio_name = cfg["rio_device"]
try:
rio = getattr(dev, rio_name)
except AttributeError:
raise cSAXSDLPCA200Error(f"{self.TAG} RIO device '{rio_name}' not found in BEC 'dev'.")
return rio
def _dout_get(self, rio, ch: int) -> int:
"""Read one digital output channel (returns 0 or 1)."""
attr = getattr(rio.digital_out, f"ch{ch}")
val = attr.get()
return int(val)
def _dout_set(self, rio, ch: int, value: bool) -> None:
"""Write one digital output channel (True=HIGH=1, False=LOW=0)."""
attr = getattr(rio.digital_out, f"ch{ch}")
attr.set(value)
def _read_gain_bits(self, amp_name: str) -> tuple[int, int, int, int]:
"""
Read current gain bit-state from hardware.
Returns
-------
(msb, mid, lsb, speed_mode)
speed_mode: 1 = low-noise (Pin14=HIGH), 0 = high-speed (Pin14=LOW)
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
return msb, mid, lsb, speed_mode
def _decode_gain(self, msb: int, mid: int, lsb: int, speed_mode: int) -> int | None:
"""
Decode hardware bit-state into gain value (V/A).
speed_mode=1 -> low-noise table, speed_mode=0 -> high-speed table.
Returns None if the bit combination is not in the table.
"""
bits = (msb, mid, lsb)
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
# ------------------------------------------------------------------
# Public API - control
# ------------------------------------------------------------------
def set_gain(self, amp_name: str, gain: float, force_high_speed: bool = False) -> None:
"""
Set the transimpedance gain of a DLPCA-200 amplifier.
The method automatically selects low-noise mode (Pin14=HIGH) whenever
the requested gain is achievable in low-noise mode (1e3 - 1e9 V/A).
For gains of 1e10 and 1e11 V/A, high-speed mode is used automatically.
Parameters
----------
amp_name : str
Amplifier name as defined in DLPCA200_AMPLIFIER_CONFIG (e.g. "bpm4").
gain : float or int
Target gain in V/A. Must be one of:
1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11.
force_high_speed : bool, optional
If True, force high-speed (low-noise=False) mode even for gains
below 1e10. Default: False (prefer low-noise).
Examples
--------
csaxs_amp.set_gain("bpm4", 1e7) # low-noise mode (automatic)
csaxs_amp.set_gain("bim", 1e10) # high-speed mode (automatic)
csaxs_amp.set_gain("bpm4", 1e7, force_high_speed=True) # override to high-speed
"""
gain_int = int(gain)
if gain_int not in _GAIN_TO_BITS:
valid_str = ", ".join(
f"1e{int(round(__import__('math').log10(g)))}" for g in VALID_GAINS
)
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid gain {gain:.2e} V/A for '{amp_name}'. "
f"Valid values: {valid_str}"
)
msb, mid, lsb, low_noise_preferred = _GAIN_TO_BITS[gain_int]
# Apply force_high_speed override
if force_high_speed and low_noise_preferred:
# Check if this gain is achievable in high-speed mode
hs_entry = next(
(bits for bits, g in _GAIN_BITS_HIGH_SPEED.items() if g == gain_int), None
)
if hs_entry is None:
raise cSAXSDLPCA200Error(
f"{self.TAG} Gain {gain:.2e} V/A is not achievable in high-speed mode "
f"for '{amp_name}'."
)
msb, mid, lsb = hs_entry
low_noise_preferred = False
use_low_noise = low_noise_preferred and not force_high_speed
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["gain_msb"], bool(msb))
self._dout_set(rio, ch["gain_mid"], bool(mid))
self._dout_set(rio, ch["gain_lsb"], bool(lsb))
self._dout_set(rio, ch["speed_mode"], use_low_noise) # True=low-noise
mode_str = "low-noise" if use_low_noise else "high-speed"
logger.info(
f"{self.TAG} [{amp_name}] gain set to {gain_int:.2e} V/A "
f"({mode_str} mode, bits MSB={msb} MID={mid} LSB={lsb})"
)
print(
f"{amp_name}: gain -> {gain_int:.2e} V/A [{mode_str}] "
f"(bits: MSB={msb} MID={mid} LSB={lsb})"
)
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set gain on '{amp_name}': {exc}"
) from exc
def set_coupling(self, amp_name: str, coupling: str) -> None:
"""
Set AC or DC coupling on a DLPCA-200 amplifier.
Parameters
----------
amp_name : str
Amplifier name (e.g. "bpm4", "bim").
coupling : str
"AC" or "DC" (case-insensitive).
DC -> Pin13 HIGH, AC -> Pin13 LOW.
Examples
--------
csaxs_amp.set_coupling("bpm4", "DC")
csaxs_amp.set_coupling("bim", "AC")
"""
coupling_upper = coupling.strip().upper()
if coupling_upper not in ("AC", "DC"):
raise cSAXSDLPCA200Error(
f"{self.TAG} Invalid coupling '{coupling}' for '{amp_name}'. " f"Use 'AC' or 'DC'."
)
pin13_high = coupling_upper == "DC"
try:
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
self._dout_set(rio, ch["coupling"], pin13_high)
logger.info(f"{self.TAG} [{amp_name}] coupling set to {coupling_upper}")
print(f"{amp_name}: coupling -> {coupling_upper}")
except cSAXSDLPCA200Error:
raise
except Exception as exc:
raise cSAXSDLPCA200Error(
f"{self.TAG} Failed to set coupling on '{amp_name}': {exc}"
) from exc
# ------------------------------------------------------------------
# Public API - readback / reporting
# ------------------------------------------------------------------
def read_settings(self, amp_name: str) -> dict:
"""
Read back the current settings from hardware digital outputs.
Returns
-------
dict with keys:
"amp_name" : str
"gain" : int or None - gain in V/A (None if unknown bit pattern)
"mode" : str - "low-noise" or "high-speed"
"coupling" : str - "AC" or "DC"
"bits" : dict - raw bit values {msb, mid, lsb, speed_mode, coupling}
"""
rio = self._get_rio(amp_name)
ch = self._get_cfg(amp_name)["channels"]
msb = self._dout_get(rio, ch["gain_msb"])
mid = self._dout_get(rio, ch["gain_mid"])
lsb = self._dout_get(rio, ch["gain_lsb"])
speed_mode = self._dout_get(rio, ch["speed_mode"])
coupling_bit = self._dout_get(rio, ch["coupling"])
gain = self._decode_gain(msb, mid, lsb, speed_mode)
mode = "low-noise" if speed_mode else "high-speed"
coupling = "DC" if coupling_bit else "AC"
return {
"amp_name": amp_name,
"gain": gain,
"mode": mode,
"coupling": coupling,
"bits": {
"msb": msb,
"mid": mid,
"lsb": lsb,
"speed_mode": speed_mode,
"coupling": coupling_bit,
},
}
def info(self, amp_name: str) -> None:
"""
Print a plain summary of the current settings for one amplifier.
Example output
--------------
Amplifier : bpm4
Description : Beam Position Monitor 4 current amplifier
RIO device : galilrioesxbox
Gain : 1.00e+07 V/A
Mode : low-noise
Coupling : DC
Raw bits : MSB=1 MID=0 LSB=0 speed=1 coup=1
"""
cfg = self._get_cfg(amp_name)
try:
s = self.read_settings(amp_name)
except Exception as exc:
print(f"{self.TAG} [{amp_name}] Could not read settings: {exc}")
return
gain_str = (
f"{s['gain']:.2e} V/A" if s["gain"] is not None else "UNKNOWN (invalid bit pattern)"
)
bits = s["bits"]
print(f" {'Amplifier':<12}: {amp_name}")
print(f" {'Description':<12}: {cfg.get('description', '')}")
print(f" {'RIO device':<12}: {cfg['rio_device']}")
print(f" {'Gain':<12}: {gain_str}")
print(f" {'Mode':<12}: {s['mode']}")
print(f" {'Coupling':<12}: {s['coupling']}")
print(
f" {'Raw bits':<12}: MSB={bits['msb']} MID={bits['mid']} LSB={bits['lsb']} speed={bits['speed_mode']} coup={bits['coupling']}"
)
def info_all(self) -> None:
"""
Print a plain summary for ALL configured amplifiers.
"""
print("\nDLPCA-200 Amplifier Status Report")
print("-" * 40)
for amp_name in sorted(self._config.keys()):
self.info(amp_name)
print()
def list_amplifiers(self) -> list[str]:
"""Return sorted list of configured amplifier names."""
return sorted(self._config.keys())

View File

@@ -41,8 +41,10 @@ import builtins
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSFilterTransmission: class cSAXSFilterTransmission:
""" """

View File

@@ -8,11 +8,14 @@ from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
# Pull BEC globals if present # Pull BEC globals if present
bec = builtins.__dict__.get("bec") if builtins.__dict__.get("bec") is not None:
dev = builtins.__dict__.get("dev") bec = builtins.__dict__.get("bec")
umv = builtins.__dict__.get("umv") dev = builtins.__dict__.get("dev")
umvr = builtins.__dict__.get("umvr") scans = builtins.__dict__.get("scans")
def umv(*args):
return scans.umv(*args, relative=False)
class cSAXSInitSmaractStagesError(Exception): class cSAXSInitSmaractStagesError(Exception):
pass pass
@@ -383,7 +386,6 @@ class cSAXSInitSmaractStages:
if not self._yesno("Proceed with the motions listed above?", "y"): if not self._yesno("Proceed with the motions listed above?", "y"):
logger.info("[cSAXS] Motion to initial position aborted by user.") logger.info("[cSAXS] Motion to initial position aborted by user.")
return return
# --- Execution phase (SIMULTANEOUS MOTION) --- # --- Execution phase (SIMULTANEOUS MOTION) ---
if umv is None: if umv is None:
logger.error("[cSAXS] 'umv' is not available in this session.") logger.error("[cSAXS] 'umv' is not available in this session.")

View File

@@ -22,8 +22,10 @@ logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
class FlomniToolsError(Exception): class FlomniToolsError(Exception):

View File

@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
class flomniGuiToolsError(Exception): class flomniGuiToolsError(Exception):
@@ -105,11 +107,12 @@ class flomniGuiTools:
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox") idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
text = ( text = (
"<pre>" "<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n" + "██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗\n"
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n" + "██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║\n"
+ "| `-,| || | | || |'.'| || |' ' || | \n" + "██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║\n"
+ "| .-'| |' '-' '| | | || | ` || | \n" + "██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║\n"
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n" + "██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║\n"
+ "╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝\n"
+ "</pre>" + "</pre>"
) )
idle_text_box.set_html_text(text) idle_text_box.set_html_text(text)

View File

@@ -13,8 +13,10 @@ logger = bec_logger.logger
# import builtins to avoid linter errors # import builtins to avoid linter errors
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING: if TYPE_CHECKING:
from bec_ipython_client.plugins.flomni import Flomni from bec_ipython_client.plugins.flomni import Flomni

View File

@@ -7,8 +7,10 @@ from bec_widgets.cli.client import BECDockArea
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYGuiToolsError(Exception): class OMNYGuiToolsError(Exception):
@@ -81,16 +83,12 @@ class OMNYGuiTools:
pass pass
text = ( text = (
"<pre>" "<pre>"
+ " ,o888888o. ,8. ,8. b. 8 `8.`8888. ,8' \n" + "██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗\n"
+ " . 8888 `88. ,888. ,888. 888o. 8 `8.`8888. ,8' \n" + "██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝\n"
+ ",8 8888 `8b .`8888. .`8888. Y88888o. 8 `8.`8888. ,8' \n" + "██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝ \n"
+ "88 8888 `8b ,8.`8888. ,8.`8888. .`Y888888o. 8 `8.`8888.,8' \n" + "██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝ \n"
+ "88 8888 88 ,8'8.`8888,8^8.`8888. 8o. `Y888888o. 8 `8.`88888' \n" + "██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║ \n"
+ "88 8888 88 ,8' `8.`8888' `8.`8888. 8`Y8o. `Y88888o8 `8. 8888 \n" + "╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ \n"
+ "88 8888 ,8P ,8' `8.`88' `8.`8888. 8 `Y8o. `Y8888 `8 8888 \n"
+ "`8 8888 ,8P ,8' `8.`' `8.`8888. 8 `Y8o. `Y8 8 8888 \n"
+ " ` 8888 ,88' ,8' `8 `8.`8888. 8 `Y8o.` 8 8888 \n"
+ " `8888888P' ,8' ` `8.`8888. 8 `Yo 8 8888 \n"
+ "</pre>" + "</pre>"
) )
self.idle_text_box.set_html_text(text) self.idle_text_box.set_html_text(text)

View File

@@ -27,9 +27,10 @@ logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYInitError(Exception): class OMNYInitError(Exception):
pass pass

View File

@@ -16,8 +16,10 @@ from rich.table import Table
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYToolsError(Exception): class OMNYToolsError(Exception):

View File

@@ -16,8 +16,10 @@ from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fsh
if builtins.__dict__.get("bec") is not None: if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
class OMNYTransferError(Exception): class OMNYTransferError(Exception):

View File

@@ -13,8 +13,10 @@ logger = bec_logger.logger
# import builtins to avoid linter errors # import builtins to avoid linter errors
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev") dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv") scans = builtins.__dict__.get("scans")
umvr = builtins.__dict__.get("umvr")
def umv(*args):
return scans.umv(*args, relative=False)
if TYPE_CHECKING: if TYPE_CHECKING:
from bec_ipython_client.plugins.omny import OMNY from bec_ipython_client.plugins.omny import OMNY

View File

@@ -30,29 +30,74 @@ logger = bec_logger.logger
logger.info("Using the cSAXS startup script.") logger.info("Using the cSAXS startup script.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
elif _args.session.lower() == "csaxs":
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
logger.success("cSAXS session loaded.")
from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools
debug = DebugTools() debug = DebugTools()
logger.success("Debug tools loaded. Use 'debug' to access them.") logger.success("Debug tools loaded. Use 'debug' to access them.")
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS.cSAXS import cSAXS
csaxs = cSAXS(bec)
logger.success("cSAXS session loaded.")
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██╗ █████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██║ ██╔══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ ██║ ███████║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██║ ██╔══██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ███████╗██║ ██║██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C L a m N I
""")
elif _args.session.lower() == "omny":
from csaxs_bec.bec_ipython_client.plugins.flomni import OMNY
_session_name = "OMNY"
omny = OMNY(bec)
logger.success("OMNY session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ██████╗ ███╗ ███╗███╗ ██╗██╗ ██╗
██╔══██╗██╔════╝██╔════╝ ██╔═══██╗████╗ ████║████╗ ██║╚██╗ ██╔╝
██████╔╝█████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║ ╚████╔╝
██╔══██╗██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║ ╚██╔╝
██████╔╝███████╗╚██████╗ ╚██████╔╝██║ ╚═╝ ██║██║ ╚████║ ██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝
B E C O M N Y
""")
elif _args.session.lower() == "flomni":
from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni
_session_name = "flomni"
flomni = Flomni(bec)
logger.success("flomni session loaded.")
print(r"""
██████╗ ███████╗ ██████╗ ███████╗██╗ ██████╗ ███╗ ███╗███╗ ██╗██╗
██╔══██╗██╔════╝██╔════╝ ██╔════╝██║ ██╔═══██╗████╗ ████║████╗ ██║██║
██████╔╝█████╗ ██║ █████╗ ██║ ██║ ██║██╔████╔██║██╔██╗ ██║██║
██╔══██╗██╔══╝ ██║ ██╔══╝ ██║ ██║ ██║██║╚██╔╝██║██║╚██╗██║██║
██████╔╝███████╗╚██████╗ ██║ ███████╗╚██████╔╝██║ ╚═╝ ██║██║ ╚████║██║
╚═════╝ ╚══════╝ ╚═════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝
B E C f l O M N I
""")
# SETUP BEAMLINE INFO # SETUP BEAMLINE INFO
from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo

View File

@@ -9,27 +9,27 @@ eiger_1_5:
readoutPriority: async readoutPriority: async
softwareTrigger: False softwareTrigger: False
eiger_9: # eiger_9:
description: Eiger 9M detector # description: Eiger 9M detector
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M # deviceClass: csaxs_bec.devices.jungfraujoch.eiger_9m.Eiger9M
deviceConfig: # deviceConfig:
detector_distance: 100 # detector_distance: 100
beam_center: [0, 0] # beam_center: [0, 0]
onFailure: raise # onFailure: raise
enabled: true # enabled: true
readoutPriority: async # readoutPriority: async
softwareTrigger: False # softwareTrigger: False
ids_cam: # ids_cam:
description: IDS camera for live image acquisition # description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera # deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
deviceConfig: # deviceConfig:
camera_id: 201 # camera_id: 201
bits_per_pixel: 24 # bits_per_pixel: 24
m_n_colormode: 1 # m_n_colormode: 1
live_mode: True # live_mode: True
onFailure: raise # onFailure: raise
enabled: true # enabled: true
readoutPriority: async # readoutPriority: async
softwareTrigger: True # softwareTrigger: True

View File

@@ -0,0 +1,25 @@
############################################################
##################### EPS ##################################
############################################################
x12saEPS:
description: X12SA EPS info and control
deviceClass: csaxs_bec.devices.epics.eps.EPS
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
##################### GalilRIO #############################
############################################################
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesxbox.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -1,11 +1,11 @@
# This is the main configuration file that is # This is the main configuration file that is
# commented or uncommented according to the type of experiment # commented or uncommented according to the type of experiment
optics: # optics:
- !include ./bl_optics_hutch.yaml # - !include ./bl_optics_hutch.yaml
frontend: # frontend:
- !include ./bl_frontend.yaml # - !include ./bl_frontend.yaml
endstation: endstation:
- !include ./bl_endstation.yaml - !include ./bl_endstation.yaml
@@ -16,8 +16,8 @@ detectors:
#sastt: #sastt:
# - !include ./sastt.yaml # - !include ./sastt.yaml
#flomni: flomni:
# - !include ./ptycho_flomni.yaml - !include ./ptycho_flomni.yaml
#omny: #omny:
# - !include ./ptycho_omny.yaml # - !include ./ptycho_omny.yaml

View File

@@ -471,7 +471,7 @@ omnyfsh:
#################### GUI Signals ########################### #################### GUI Signals ###########################
############################################################ ############################################################
omny_xray_gui: omny_xray_gui:
description: Gui Epics signals description: Gui signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceConfig: {} deviceConfig: {}
enabled: true enabled: true
@@ -486,4 +486,25 @@ calculated_signal:
compute_method: "def just_rand():\n return 42" compute_method: "def just_rand():\n return 42"
enabled: true enabled: true
readOnly: false readOnly: false
readoutPriority: baseline readoutPriority: baseline
############################################################
#################### OMNY Pandabox #########################
############################################################
omny_panda:
readoutPriority: async
deviceClass: csaxs_bec.devices.panda_box.panda_box_omny.PandaBoxOMNY
deviceConfig:
host: omny-panda.psi.ch
signal_alias:
FMC_IN.VAL1.Min: cap_voltage_fzp_y_min
FMC_IN.VAL1.Max: cap_voltage_fzp_y_max
FMC_IN.VAL1.Mean: cap_voltage_fzp_y_mean
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
deviceTags:
- detector
enabled: true
readOnly: false
softwareTrigger: false

View File

@@ -271,4 +271,20 @@ rty:
enabled: true enabled: true
readOnly: False readOnly: False
############################################################
######################### Cameras ##########################
############################################################
cam_xeye:
description: Camera LamNI Xray eye ID15
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 15
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async

View File

@@ -309,7 +309,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay) # Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay)
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width) self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
self.set_delay_pairs(channel="gh", delay=self._shutter_to_open_delay, width=(shutter_width-self._shutter_to_open_delay)) self.set_delay_pairs(
channel="gh",
delay=self._shutter_to_open_delay,
width=(shutter_width - self._shutter_to_open_delay),
)
# Trigger extra pulse for MCS OR gate # Trigger extra pulse for MCS OR gate
# f = e + 1us # f = e + 1us
@@ -520,7 +524,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
- Return the status object to BEC which will automatically resolve once the status register has - Return the status object to BEC which will automatically resolve once the status register has
the END_OF_BURST bit set. The callback of the status object will also stop the polling loop. the END_OF_BURST bit set. The callback of the status object will also stop the polling loop.
""" """
overall_start = time.time()
self._stop_polling() self._stop_polling()
# NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source # NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source
@@ -559,7 +562,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Send trigger # Send trigger
self.trigger_shot.put(1, use_complete=True) self.trigger_shot.put(1, use_complete=True)
self.cancel_on_stop(status) self.cancel_on_stop(status)
logger.info(f"Configured ddg in {time.time()-overall_start}")
return status return status
def on_stop(self) -> None: def on_stop(self) -> None:

View File

@@ -261,7 +261,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance). **kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
""" """
with self._rlock: with self._rlock:
logger.info(f"Received update on mcs card {self.name}")
if self._omit_mca_callbacks.is_set(): if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels return # Suppress callbacks when erasing all channels
self._mca_counter_index += 1 self._mca_counter_index += 1
@@ -293,9 +292,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
) )
# Once we have received all channels, push data to BEC and reset for next accumulation # Once we have received all channels, push data to BEC and reset for next accumulation
logger.info(
f"Received update for {attr_name}, index {self._mca_counter_index}/{self.NUM_MCA_CHANNELS}"
)
if len(self._current_data) == self.NUM_MCA_CHANNELS: if len(self._current_data) == self.NUM_MCA_CHANNELS:
logger.debug( logger.debug(
f"Current data index {self._current_data_index} complete, pushing to BEC." f"Current data index {self._current_data_index} complete, pushing to BEC."
@@ -398,11 +394,12 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# NOTE Make sure that the signal that omits mca callbacks is cleared # NOTE Make sure that the signal that omits mca callbacks is cleared
self._omit_mca_callbacks.clear() self._omit_mca_callbacks.clear()
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
# For a fly scan we need to start the mcs card ourselves # For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly": if self.scan_info.msg.scan_type == "fly":
self.erase_start.put(1) self.erase_start.put(1)
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_prescan(self) -> None | StatusBase: def on_prescan(self) -> None | StatusBase:
""" """
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure

View File

@@ -132,7 +132,6 @@ class Eiger(PSIDeviceBase):
if data is None: if data is None:
logger.error(f"Received image message on device {self.name} without data.") logger.error(f"Received image message on device {self.name} without data.")
return return
logger.info(f"Received preview image on device {self.name}")
self.preview_image.put(data) self.preview_image.put(data)
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring

View File

@@ -13,6 +13,7 @@ which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS varia
from __future__ import annotations from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
@@ -78,12 +79,38 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
""" """
_NUM_ANALOG_CHANNELS = 8 _NUM_ANALOG_CHANNELS = 8
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs): def __init__(
self,
signal_name: str,
channel: int,
parent: GalilRIO,
readback_timeout: float = None,
**kwargs,
):
super().__init__(signal_name=signal_name, parent=parent, **kwargs) super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel self._channel = channel
self._metadata["connected"] = False self._metadata["connected"] = False
self._readback_timeout = (
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
)
self._metadata["write_access"] = False self._metadata["write_access"] = False
self._last_readback = 0.0
def get(self):
current_time = time.monotonic()
if current_time - self._last_readback > self._readback_timeout:
old_value = self._readback
self._last_readback = current_time # _socket_get may rely on this value to be set.
self._readback = self._socket_get()
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_value,
value=self._readback,
timestamp=current_time,
)
return self._readback
def _socket_set(self, val): def _socket_set(self, val):
"""Read-only signal, so set method raises an error.""" """Read-only signal, so set method raises an error."""
@@ -136,6 +163,8 @@ class GalilRIOAnalogSignalRO(GalilSignalBase):
# Run subscriptions after all readbacks have been updated # Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update # on all channels except the one that triggered the update
# TODO for now skip running subscribers, this should be re-implemented
# once we properly handle subscriptions from bec running "read"
for walk in self.parent.walk_signals(): for walk in self.parent.walk_signals():
if walk.item.attr_name in updates: if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name] new_val, old_val = updates[walk.item.attr_name]
@@ -185,7 +214,7 @@ def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
an_channels[f"ch{i}"] = ( an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO, GalilRIOAnalogSignalRO,
f"ch{i}", f"ch{i}",
{"kind": Kind.normal, "notify_bec": True, "channel": i, "doc": f"Analog channel {i}."}, {"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
) )
return an_channels return an_channels
@@ -202,12 +231,7 @@ def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
di_out_channels[f"ch{i}"] = ( di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal, GalilRIODigitalOutSignal,
f"ch{i}", f"ch{i}",
{ {"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
"kind": Kind.config,
"notify_bec": True,
"channel": i,
"doc": f"Digital output channel {i}.",
},
) )
return di_out_channels return di_out_channels

View File

@@ -15,7 +15,6 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilAxesReferenced, GalilAxesReferenced,
GalilController, GalilController,
GalilMotorIsMoving, GalilMotorIsMoving,
GalilMotorResolution,
GalilSetpointSignal, GalilSetpointSignal,
GalilSignalRO, GalilSignalRO,
retry_once, retry_once,
@@ -24,6 +23,19 @@ from csaxs_bec.devices.omny.galil.galil_ophyd import (
logger = bec_logger.logger logger = bec_logger.logger
class GalilMotorResolution(GalilSignalRO):
@retry_once
@threadlocked
def _socket_get(self):
if self.parent.axis_Id_numeric < 6:
return float(
self.controller.socket_put_and_receive(f"MG encpermm[{self.parent.axis_Id_numeric}]")
)
else:
return float(
self.controller.socket_put_and_receive(f"MG stppermm[{self.parent.axis_Id_numeric}]")
)
class LamniGalilController(GalilController): class LamniGalilController(GalilController):
@@ -154,11 +166,18 @@ class LamniGalilReadbackSignal(GalilSignalRO):
Returns: Returns:
float: Readback value after adjusting for sign and motor resolution. float: Readback value after adjusting for sign and motor resolution.
""" """
current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}")) if self.parent.axis_Id_numeric < 6:
current_pos *= self.parent.sign current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
step_mm = self.parent.motor_resolution.get() current_pos *= self.parent.sign
return current_pos / step_mm encoder_resolution = self.parent.motor_resolution.get()
logger.info(f"Read galil encoder position of axis {self.parent.axis_Id_numeric} to be TP {current_pos} with resolution {encoder_resolution}")
return current_pos / encoder_resolution
else:
current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
return current_pos / step_mm
def read(self): def read(self):
self._metadata["timestamp"] = time.time() self._metadata["timestamp"] = time.time()
val = super().read() val = super().read()

View File

@@ -65,10 +65,8 @@ class RtLamniController(Controller):
"_position_sampling_single_read", "_position_sampling_single_read",
"_position_sampling_single_reset_and_start_sampling", "_position_sampling_single_reset_and_start_sampling",
"show_signal_strength_interferometer", "show_signal_strength_interferometer",
"show_interferometer_positions",
"show_analog_signals", "show_analog_signals",
"show_feedback_status", "show_feedback_status",
] ]
def __init__( def __init__(

View File

View File

@@ -0,0 +1,103 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxCSAXS(PandaBox):
"""
PandaBox integration for cSAXS. This class implements cSAXS specific logic for the PandaBox integration.
TODO: This logic is not yet mapped to any existing hardware. Adapt Docstring once the hardware is defined and integrated.
"""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxCSAXS(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -0,0 +1,99 @@
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
logger = bec_logger.logger
class PandaBoxOMNY(PandaBox):
"""PandaBox integration for OMNY. This class implements OMNY specific logic for the PandaBox integration."""
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
def on_stage(self):
start_time = time.time()
super().on_stage()
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_info.msg.scan_type == "fly":
self._acquisition_group = "fly"
elif self.scan_info.msg.scan_type == "step":
if self.scan_info.msg.scan_parameters["frames_per_trigger"] == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_info.msg.num_points
* self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxOMNY(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")

View File

@@ -169,6 +169,9 @@ class LamNIMixin:
self.device_manager.devices.lsamx.read_only = True self.device_manager.devices.lsamx.read_only = True
self.device_manager.devices.lsamy.read_only = True self.device_manager.devices.lsamy.read_only = True
#update angle readback before start of the scan
yield from self.stubs.send_rpc_and_wait("lsamrot", "readback.get")
yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset") yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset")
@@ -210,7 +213,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
arg_input = {} arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs): def __init__(self, *args, parameter: dict = None, frames_per_trigger:int=1, exp_time:float=0,**kwargs):
""" """
A LamNI scan following Fermat's spiral. A LamNI scan following Fermat's spiral.
@@ -230,10 +233,10 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
Examples: Examples:
>>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1) >>> scans.lamni_fermat_scan(fov_size=[20], step=0.5, exp_time=0.1)
>>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1) >>> scans.lamni_fermat_scan(fov_size=[20, 25], center_x=0.02, center_y=0, shift_x=0, shift_y=0, angle=0, step=0.5, fov_circular=0, exp_time=0.1, frames_per_trigger=1)
""" """
super().__init__(parameter=parameter, **kwargs) super().__init__(parameter=parameter, frames_per_trigger=frames_per_trigger, exp_time=exp_time,**kwargs)
self.axis = [] self.axis = []
scan_kwargs = parameter.get("kwargs", {}) scan_kwargs = parameter.get("kwargs", {})
self.fov_size = scan_kwargs.get("fov_size") self.fov_size = scan_kwargs.get("fov_size")
@@ -482,6 +485,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
yield from self.open_scan() yield from self.open_scan()
yield from self.stage() yield from self.stage()
yield from self.run_baseline_reading() yield from self.run_baseline_reading()
yield from self.pre_scan()
yield from self.scan_core() yield from self.scan_core()
yield from self.finalize() yield from self.finalize()
yield from self.unstage() yield from self.unstage()

View File

@@ -52,6 +52,7 @@ class FlomniFermatScan(SyncFlyScanBase):
angle: float = None, angle: float = None,
corridor_size: float = 3, corridor_size: float = 3,
parameter: dict = None, parameter: dict = None,
frames_per_trigger:int=1,
**kwargs, **kwargs,
): ):
""" """
@@ -62,7 +63,8 @@ class FlomniFermatScan(SyncFlyScanBase):
fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um
cenx(float) [um]: center position in x. cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y. ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time exp_time(float) [s]: exposure time per burst frame
frames_per_trigger(int) : Number of burst frames per point
step(float) [um]: stepsize step(float) [um]: stepsize
zshift(float) [um]: shift in z zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first) angle(float) [deg]: rotation angle (will rotate first)
@@ -71,10 +73,10 @@ class FlomniFermatScan(SyncFlyScanBase):
Returns: Returns:
Examples: Examples:
>>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01) >>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)
""" """
super().__init__(parameter=parameter, exp_time=exp_time, **kwargs) super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.show_live_table = False self.show_live_table = False
self.axis = [] self.axis = []
self.fovx = fovx self.fovx = fovx
@@ -323,6 +325,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stage() yield from self.stage()
yield from self.run_baseline_reading() yield from self.run_baseline_reading()
yield from self._prepare_setup_part2() yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core() yield from self.scan_core()
yield from self.finalize() yield from self.finalize()
yield from self.unstage() yield from self.unstage()

View File

@@ -1,4 +1,4 @@
""" Module with JungfrauJochTestScan class. """ """Module with JungfrauJochTestScan class."""
from bec_lib import bec_logger from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion

View File

@@ -51,6 +51,7 @@ class OMNYFermatScan(SyncFlyScanBase):
angle: float = None, angle: float = None,
corridor_size: float = 3, corridor_size: float = 3,
parameter: dict = None, parameter: dict = None,
frames_per_trigger:int=1,
**kwargs, **kwargs,
): ):
""" """
@@ -62,6 +63,7 @@ class OMNYFermatScan(SyncFlyScanBase):
cenx(float) [um]: center position in x. cenx(float) [um]: center position in x.
ceny(float) [um]: center position in y. ceny(float) [um]: center position in y.
exp_time(float) [s]: exposure time exp_time(float) [s]: exposure time
frames_per_trigger:int: Number of burst frames per trigger, defaults to 1.
step(float) [um]: stepsize step(float) [um]: stepsize
zshift(float) [um]: shift in z zshift(float) [um]: shift in z
angle(float) [deg]: rotation angle (will rotate first) angle(float) [deg]: rotation angle (will rotate first)
@@ -73,7 +75,7 @@ class OMNYFermatScan(SyncFlyScanBase):
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01) >>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01)
""" """
super().__init__(parameter=parameter, **kwargs) super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
self.axis = [] self.axis = []
self.fovx = fovx self.fovx = fovx
self.fovy = fovy self.fovy = fovy
@@ -299,6 +301,7 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stage() yield from self.stage()
yield from self.run_baseline_reading() yield from self.run_baseline_reading()
yield from self._prepare_setup_part2() yield from self._prepare_setup_part2()
yield from self.pre_scan()
yield from self.scan_core() yield from self.scan_core()
yield from self.finalize() yield from self.finalize()
yield from self.unstage() yield from self.unstage()

View File

@@ -108,12 +108,17 @@ The nano-positioning is controlled by a feedback loop running on a real-time lin
Once the loop has started, it is possible to start bec with the flOMNI configuration file. Once the loop has started, it is possible to start bec with the flOMNI configuration file.
Starting bec with session will load the scripts
`bec --session flomni`
The flOMNI scripts can be loaded manually by
`from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni`
`flomni = Flomni(bec)`
Loading the flOMNI configuration (this command will load the OMNY configuration only - isolated from the beamline) Loading the flOMNI configuration (this command will load the OMNY configuration only - isolated from the beamline)
`bec.config.update_session_with_file("/bec/csaxs_bec/csaxs_bec/device_configs/flomni_config.yaml")` `bec.config.update_session_with_file("/bec/csaxs_bec/csaxs_bec/device_configs/flomni_config.yaml")`
Loading the flOMNI scripts
`from csaxs_bec.bec_ipython_client.plugins.flomni import Flomni`
`flomni = Flomni(bec)`
If the realtime system is restarted, bec will lose communication. To restart: If the realtime system is restarted, bec will lose communication. To restart:
`flomni.rt_off()` … then wait a few seconds `flomni.rt_off()` … then wait a few seconds
@@ -138,10 +143,14 @@ This script will first verify that the stages are not in an initialized state, a
The positions of the optics stages are stored as stage parameters and are thus linked to the configuration file. The positions of the optics stages are stored as stage parameters and are thus linked to the configuration file.
Example: The OSAx “in” position can be reviewed by `dev.fosax.user_parameter` Example: The OSAx “in” position can be reviewed by `dev.fosax.user_parameter`
Update the value by (example "fosax", "in") by `dev.fosax.update_user_parameter({"in":value})` Update the value by (example "fosax", "in") by `dev.fosax.update_user_parameter({"in":value})`
Important note: if these values are changed, they are not automatically stored to the config file and will only be available in the current session.
`flomni.ffzp_info()` shows info about the available FZPs at the current energy of the beamline. Optional parameter is the photon _energy_ in keV. `flomni.ffzp_info()` shows info about the available FZPs at the current energy of the beamline. Optional parameter is the photon _energy_ in keV.
Example: `flomni.ffzp_info(6.2)` Example: `flomni.ffzp_info(6.2)`
Documents about availabe optics can be accessed by
`flomni.flomnigui_docs`
The [laser feedback](user.ptychography.flomni.laser_feedback) will be disabled and fine alignment lost if foptx/y are moved! The [laser feedback](user.ptychography.flomni.laser_feedback) will be disabled and fine alignment lost if foptx/y are moved!
Following functions exist to move the optics in and out, with self-explaining naming. Following functions exist to move the optics in and out, with self-explaining naming.
@@ -193,14 +202,15 @@ The basic scan function can be called by `scans.flomni_fermat_scan()` and offers
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um | | fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x | | cenx (float) | center position in x |
| ceny (float) | center position in y | | ceny (float) | center position in y |
| exp_time (float) | exposure time | | exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize | | step (float) | stepsize |
| zshift (float) | shift in z | | zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) | | angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um | | corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example: Example:
`scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)` `scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps #### Overview of the alignment steps

View File

@@ -102,13 +102,16 @@ The nano-positioning is controlled by a feedback loop running on a real-time lin
Once the loop has started, it is possible to start bec with the LamNI configuration file. Once the loop has started, it is possible to start bec with the LamNI configuration file.
Loading the LamNI configuration (this command will load the LamNI configuration only - isolated from the beamline) Loading the LamNI scripts is done by starting bec as
`bec.config.update_session_with_file("/bec/csaxs_bec/csaxs_bec/device_configs/lamni_config.yaml")` `bec --session lamni`
Loading the LamNI scripts The scripts can alternatively manually be loaded by
`from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI` `from csaxs_bec.bec_ipython_client.plugins.LamNI import LamNI`
`lamni = LamNI(bec)` `lamni = LamNI(bec)`
Loading the LamNI configuration (this command will load the LamNI configuration only - isolated from the beamline)
`bec.config.update_session_with_file("/bec/csaxs_bec/csaxs_bec/device_configs/lamni_config.yaml")`
If the realtime system is restarted, BEC will lose communication. To restart: If the realtime system is restarted, BEC will lose communication. To restart:
`lamni.rt_off()` … then wait a 10 seconds `lamni.rt_off()` … then wait a 10 seconds
`lamni.rt_on()` `lamni.rt_on()`
@@ -152,6 +155,12 @@ The underlying scan function can be called as
Use `scans.lamni_fermat_scan?`for detailed information. A prerequisite for scanning is a running feedback system. Use `scans.lamni_fermat_scan?`for detailed information. A prerequisite for scanning is a running feedback system.
### GUI tools
During operation the BEC GUI will show the relevant cameras or progress information. To manually switch view TAB completion on 'lamni.lamnigui_' will show all options to control the GUI. Most useful
'lamni.lamnigui_show_progress()' will show the measurement progress GUI
'lamnigui_show_xeyealign()' will show the XrayEye alignment GUI
### X-ray optics alignment ### X-ray optics alignment
The positions of the optics stages are stored as stage parameters and are thus linked to the configuration file. The positions of the optics stages are stored as stage parameters and are thus linked to the configuration file.

View File

@@ -327,14 +327,15 @@ The basic scan function can be called by `scans.omny_fermat_scan()` and offers a
| fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um | | fovy (float) | Fov in the piezo plane (i.e. piezo range). Max 100 um |
| cenx (float) | center position in x | | cenx (float) | center position in x |
| ceny (float) | center position in y | | ceny (float) | center position in y |
| exp_time (float) | exposure time | | exp_time (float) | exposure time per frame |
| frames_per_trigger(int) | Number of burst frames per position |
| step (float) | stepsize | | step (float) | stepsize |
| zshift (float) | shift in z | | zshift (float) | shift in z |
| angle (float) | rotation angle (will rotate first) | | angle (float) | rotation angle (will rotate first) |
| corridor_size (float) | corridor size for the corridor optimization. Default 3 um | | corridor_size (float) | corridor size for the corridor optimization. Default 3 um |
Example: Example:
`scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01)` `scans.omny_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1)`
#### Overview of the alignment steps #### Overview of the alignment steps

View File

@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "csaxs_bec" name = "csaxs_bec"
version = "0.0.0" version = "0.0.0"
description = "The cSAXS plugin repository for BEC" description = "The cSAXS plugin repository for BEC"
requires-python = ">=3.10" requires-python = ">=3.11"
classifiers = [ classifiers = [
"Development Status :: 3 - Alpha", "Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",

View File

@@ -35,16 +35,16 @@ def test_save_frame(bec_client_mock):
lamni = LamNI(client) lamni = LamNI(client)
align = XrayEyeAlign(client, lamni) align = XrayEyeAlign(client, lamni)
with mock.patch( with mock.patch(
"csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put" "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
) as epics_put_mock: ) as epics_put_mock:
align.save_frame() align.save_frame()
epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1) epics_put_mock.assert_called_once_with("XOMNYI-XEYE-SAVFRAME:0", 1)
def test_update_frame(bec_client_mock): def test_update_frame(bec_client_mock):
epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_put" epics_put = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_put"
epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.epics_get" epics_get = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.epics_get"
fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.x_ray_eye_align.fshopen" fshopen = "csaxs_bec.bec_ipython_client.plugins.LamNI.alignment.fshopen"
client = bec_client_mock client = bec_client_mock
client.device_manager.devices.xeye = DeviceBase( client.device_manager.devices.xeye = DeviceBase(
name="xeye", name="xeye",

View File

@@ -0,0 +1,190 @@
"""Module for testing the PandaBoxCSAXS and PandaBoxOMNY devices."""
# pylint: skip-file
from __future__ import annotations
from unittest import mock
import pytest
from ophyd import Staged
from csaxs_bec.devices.panda_box.panda_box import PandaBoxCSAXS
from csaxs_bec.devices.panda_box.panda_box_omny import PandaBoxOMNY
@pytest.fixture
def panda_omny():
dev_name = "panda_omny"
dev = PandaBoxOMNY(
name=dev_name,
host="omny-panda-box.psi.ch",
signal_alias={
"FMC_IN.VAL1.Min": "cap_voltage_fzp_y_min",
"FMC_IN.VAL1.Max": "cap_voltage_fzp_y_max",
"FMC_IN.VAL1.Mean": "cap_voltage_fzp_y_mean",
"FMC_IN.VAL2.Min": "cap_voltage_fzp_x_min",
"FMC_IN.VAL2.Max": "cap_voltage_fzp_x_max",
"FMC_IN.VAL2.Mean": "cap_voltage_fzp_x_mean",
},
)
yield dev
@pytest.fixture
def panda_csaxs():
dev_name = "panda_csaxs"
dev = PandaBoxCSAXS(name=dev_name, host="csaxs-panda-box.psi.ch")
yield dev
def test_panda_omny(panda_omny):
assert panda_omny.name == "panda_omny"
assert panda_omny.host == "omny-panda-box.psi.ch"
all_signal_names = [name for name, _ in panda_omny.data.signals]
# Check that the signal aliases are correctly set up
assert "cap_voltage_fzp_y_min" in all_signal_names
assert "cap_voltage_fzp_y_max" in all_signal_names
assert "cap_voltage_fzp_y_mean" in all_signal_names
assert "cap_voltage_fzp_x_min" in all_signal_names
assert "cap_voltage_fzp_x_max" in all_signal_names
assert "cap_voltage_fzp_x_mean" in all_signal_names
# Check that the original signal names are not present
assert "FMC_IN.VAL1.Min" not in all_signal_names
assert "FMC_IN.VAL1.Max" not in all_signal_names
assert "FMC_IN.VAL1.Mean" not in all_signal_names
assert "FMC_IN.VAL2.Min" not in all_signal_names
assert "FMC_IN.VAL2.Max" not in all_signal_names
assert "FMC_IN.VAL2.Mean" not in all_signal_names
assert panda_omny._acquisition_group == "burst"
assert panda_omny._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_acquisition_group):
# Check that the stage signal is present and has the correct PV
assert len(panda_omny._status_callbacks) == 0
panda_omny.scan_info.msg.scan_type = scan_type
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_omny.stage()
assert panda_omny._acquisition_group == expected_acquisition_group
assert panda_omny.staged == Staged.yes
def test_panda_omny_complete(panda_omny):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_omny.scan_info.msg.num_points = 1
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_omny._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_omny, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_omny._timeout_on_completed = 5
with (
mock.patch.object(panda_omny, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_omny, "_disarm", return_value=None) as mock_disarm,
):
status = panda_omny.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True
def test_panda_csaxs(panda_csaxs):
assert panda_csaxs.name == "panda_csaxs"
assert panda_csaxs.host == "csaxs-panda-box.psi.ch"
assert panda_csaxs._acquisition_group == "burst"
assert panda_csaxs._timeout_on_completed == 10
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
],
)
def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_acquisition_group):
"""Test the on_stage method of the PandaBoxCSAXS device for different scan types and frames per trigger."""
assert len(panda_csaxs._status_callbacks) == 0
panda_csaxs.scan_info.msg.scan_type = scan_type
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_csaxs.stage()
assert panda_csaxs._acquisition_group == expected_acquisition_group
assert panda_csaxs.staged == Staged.yes
def test_panda_csaxs_complete(panda_csaxs):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_csaxs.scan_info.msg.num_points = 1
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_csaxs._timeout_on_completed = 0.5 # Set a short timeout for testing
def _mock_return_captured(*args, **kwargs):
return ["=0"]
# Timeout Error on complete
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=_mock_return_captured),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
with pytest.raises(TimeoutError):
status.wait(timeout=4)
mock_disarm.assert_called_once()
# Successful complete
panda_csaxs._timeout_on_completed = 5
with (
mock.patch.object(panda_csaxs, "send_raw", side_effect=[["=0"], ["=0"], ["=1"]]),
mock.patch.object(panda_csaxs, "_disarm", return_value=None) as mock_disarm,
):
status = panda_csaxs.on_complete()
assert status.done is False
assert status.success is False
status.wait(timeout=4)
mock_disarm.assert_called_once()
assert status.done is True
assert status.success is True

View File

@@ -302,6 +302,36 @@ def device_manager_mock():
action="set", action="set",
parameter={"value": 2.1508313829565293}, parameter={"value": 2.1508313829565293},
), ),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="pre_scan",
parameter={},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": 1.3681828686580249},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": 2.1508313829565293},
),
None, None,
messages.DeviceInstructionMessage( messages.DeviceInstructionMessage(
metadata={ metadata={