9 Commits

Author SHA1 Message Date
x06da 129bfeb136 Implementing beamline state changes
CI for pxiii_bec / test (push) Successful in 38s
2026-06-04 16:36:01 +02:00
x06da 6ef7020150 Transfer macros from pxii to pxiii
CI for pxiii_bec / test (push) Successful in 29s
CI for pxiii_bec / test (pull_request) Successful in 28s
2026-05-27 16:33:11 +02:00
perl_d f59c77f142 Updating to template version 1.4.0
CI for pxiii_bec / test (pull_request) Successful in 32s
CI for pxiii_bec / test (push) Successful in 30s
2026-05-20 11:01:36 +02:00
perl_d 4d2a4c5496 Updating to template version 1.3.2
CI for pxiii_bec / test (pull_request) Successful in 31s
CI for pxiii_bec / test (push) Successful in 29s
2026-05-20 10:40:24 +02:00
wyzula_j ad6991208a fix(bec widgets): designer plugins fixed for widgets
CI for pxiii_bec / test (pull_request) Successful in 4m39s
CI for pxiii_bec / test (push) Successful in 27s
2026-05-04 15:25:11 +02:00
wyzula_j 8a023aff5a fix(bec-widgets): migration of scripts to V3
CI for pxiii_bec / test (pull_request) Successful in 32s
CI for pxiii_bec / test (push) Successful in 29s
2026-03-05 13:59:55 +01:00
perl_d f5a6b20eb8 Update repo with template version v1.2.8
CI for pxiii_bec / test (pull_request) Successful in 32s
CI for pxiii_bec / test (push) Successful in 1m21s
2026-02-27 15:49:26 +01:00
perl_d 624da08a27 Update repo with template version v1.2.7
CI for pxiii_bec / test (push) Failing after 0s
CI for pxiii_bec / test (pull_request) Failing after 0s
2026-02-27 12:11:40 +01:00
appel_c eb5a9c89ca feat: update repo with copier template for gitea migration 2025-09-11 17:02:57 +02:00
45 changed files with 4988 additions and 868 deletions
+1 -1
View File
@@ -2,7 +2,7 @@
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.0.0
_commit: v1.4.0
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: pxiii_bec
+102
View File
@@ -0,0 +1,102 @@
name: CI for pxiii_bec
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
required: false
type: string
default: "main"
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
required: false
type: string
default: "main"
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
required: false
type: string
default: "main"
BEC_PLUGIN_REPO_BRANCH:
description: "Branch of the BEC Plugin Repository to install"
required: false
type: string
default: "main"
PYTHON_VERSION:
description: "Python version to use"
required: false
type: string
default: "3.12"
permissions:
pull-requests: write
jobs:
test:
runs-on: ubuntu-latest
env:
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/pxiii_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./pxiii_bec
- name: Lint for merge conflicts from template updates
shell: bash
# Find all Copier conflicts except this line
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
- name: Checkout BEC Core
uses: actions/checkout@v4
with:
repository: bec/bec
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
path: ./bec
- name: Checkout Ophyd Devices
uses: actions/checkout@v4
with:
repository: bec/ophyd_devices
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
path: ./ophyd_devices
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec/bec_widgets
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
- name: Install dependencies
shell: bash
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Install Python dependencies
shell: bash
run: |
pip install uv
uv pip install --system -e ./ophyd_devices
uv pip install --system -e ./bec/bec_lib[dev]
uv pip install --system -e ./bec/bec_ipython_client
uv pip install --system -e ./bec/bec_server[dev]
uv pip install --system -e ./bec_widgets[dev,pyside6]
uv pip install --system -e ./pxiii_bec
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=./pxiii_bec --cov-config=./pxiii_bec/pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail ./pxiii_bec/tests/ || test $? -eq 5
+70
View File
@@ -0,0 +1,70 @@
name: Create template upgrade PR for pxiii_bec
on:
workflow_dispatch:
permissions:
pull-requests: write
jobs:
create_update_branch_and_pr:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Checkout
uses: actions/checkout@v4
- name: Create virtualenv
run: |
python -m virtualenv .venv
- name: Install tools
run: |
source .venv/bin/activate
pip install copier PySide6 bec_lib
- name: Perform update
run: |
source .venv/bin/activate
git config --global user.email "bec_ci_staging@psi.ch"
git config --global user.name "BEC automated CI"
branch="chore/update-template-$(python -m uuid)"
echo "switching to branch $branch"
git checkout -b $branch
echo "Running copier update..."
copier update --trust --defaults --conflict inline 2>&1 | tee copier.log
status=${PIPESTATUS[0]}
output="$(cat copier.log)"
echo $output
msg="$(printf '%s\n' "$output" | head -n 1)"
if ! grep -q "make_commit: true" .copier-answers.yml ; then
echo "Autocommit not made, committing..."
git add -A
git commit -a -m "$msg"
fi
if diff-index --quiet HEAD ; then
echo "No changes detected"
exit 0
fi
git push -u origin $branch
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"Template: $(echo $msg)\",
\"body\": \"This PR was created by Gitea Actions\",
\"head\": \"$(echo $branch)\",
\"base\": \"main\"
}"
-7
View File
@@ -1,7 +0,0 @@
include:
- file: /templates/plugin-repo-template.yml
inputs:
name: pxiii_bec
target: pxiii_bec
branch: $CHILD_PIPELINE_BRANCH
project: bec/awi_utils
@@ -43,3 +43,6 @@ if _args.session.lower() == "alignment":
# SETUP PROMPTS
bec._ip.prompts.username = _session_name
bec._ip.prompts.status = 1
d, planner = init_beamline_environment()
@@ -1,10 +1,14 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
is started. It can be used to add additional command line arguments.
"""
import os
from bec_lib.service_config import ServiceConfig
import pxiii_bec
def extend_command_line_args(parser):
"""
@@ -18,6 +22,11 @@ def extend_command_line_args(parser):
def get_config() -> ServiceConfig:
"""
Create and return the service configuration.
Create and return the ServiceConfig for the plugin repository
"""
return ServiceConfig(redis={"host": "x06da-bec-001", "port": 6379})
deployment_path = os.path.dirname(os.path.dirname(os.path.dirname(pxiii_bec.__file__)))
files = os.listdir(deployment_path)
if "bec_config.yaml" in files:
return ServiceConfig(config_path=os.path.join(deployment_path, "bec_config.yaml"))
else:
return ServiceConfig(redis={"host": "localhost", "port": 6379})
@@ -28,49 +28,30 @@ class PlotUpdate(AutoUpdates):
Args:
msg (ScanStatusMessage): The scan status message.
"""
if msg.scan_name == "line_scan" and msg.scan_report_devices:
return self.simple_line_scan(msg)
if msg.scan_name == "grid_scan" and msg.scan_report_devices:
return self.simple_grid_scan(msg)
dev_x = msg.scan_report_devices[0]
if "kwargs" in msg.request_inputs:
dev_y = msg.request_inputs["kwargs"].get("plot", None)
if dev_y is not None:
# Set the dock to the waveform widget
wf = self.set_dock_to_widget("Waveform")
# if "kwargs" in msg.request_inputs:
# dev_plt = msg.request_inputs["kwargs"].get("plot", None)
# if dev_plt is not None:
# # Handle depending on scan dimension
# if len(msg.scan_report_devices) == 1:
# dev_x = msg.scan_report_devices[0]
# # Set the dock to the waveform widget
# wf = self.set_dock_to_widget("Waveform")
# # Clear the waveform widget and plot the data
# wf.clear_all()
# wf.plot(
# x_name=dev_x,
# y_name=dev_plt,
# label=f"Scan {msg.info.scan_number} - {dev_plt}",
# title=f"Scan {msg.info.scan_number}",
# x_label=dev_x,
# y_label=dev_plt,
# )
# if len(msg.scan_report_devices) == 2:
# dev_x = msg.scan_report_devices[0]
# dev_y = msg.scan_report_devices[1]
# # Set the dock to the waveform widget
# wf = self.set_dock_to_widget("Waveform")
# # Clear the waveform widget and plot the data
# wf.clear_all()
# wf.plot(
# x_name=dev_x,
# y_name=dev_y,
# z_name=dev_plt,
# label=f"Scan {msg.info.scan_number} - {dev_plt}",
# title=f"Scan {msg.info.scan_number} - {dev_plt}",
# x_label=dev_x,
# y_label=dev_y,
# z_label=dev_plt,
# )
# elif msg.scan_name == "line_scan" and msg.scan_report_devices:
# return self.simple_line_scan(msg)
# elif msg.scan_name == "grid_scan" and msg.scan_report_devices:
# return self.simple_grid_scan(msg)
# elif msg.scan_report_devices:
# return self.best_effort(msg)
# Clear the waveform widget and plot the data
wf.clear_all()
wf.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {msg.info.scan_number} - {dev_y}",
title=f"Scan {msg.info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
elif msg.scan_report_devices:
return self.best_effort(msg)
return None
def on_scan_closed(self, msg: ScanStatusMessage) -> None:
+3 -1
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
logger = bec_logger.logger
@@ -18,6 +18,8 @@ _Widgets = {
class ScanHistory(RPCBase):
_IMPORT_MODULE = "pxiii_bec.bec_widgets.widgets.scan_history.scan_history"
@rpc_call
def select_scan_from_history(self, value: "int") -> "None":
"""
@@ -0,0 +1,13 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"ScanHistory": ("pxiii_bec.bec_widgets.widgets.scan_history.scan_history", "ScanHistory"),
}
widget_icons = {
"ScanHistory": "widgets",
}
@@ -0,0 +1,211 @@
states:
robot_sample_exchange:
allow_modifiers: true
bl_pos: in
bl_bright: 'off'
bs_pos: in
bs_z: safe
coll_y: out
cryo_pos: in
det_cov: 'close'
diag_y: out
fl_bright: 'off'
aerotech_x: in
aerotech_y: mount
aerotech_z: mount
aerotech_u: mount
smargon_x: mount
smargon_y: mount
smargon_chi: mount
smargon_phi: mount
xrf_pos: out
sample_alignment:
allow_modifiers: true
bl_pos: in
bl_bright: 'on'
bs_pos: in
bs_z: safe
coll_y: out
cryo_pos: in
det_cov: 'close'
diag_y: out
fl_bright: 'on'
aerotech_x: in
aerotech_y: work
aerotech_z: work
# aerotech_u: mount
# smargon_x: mount
# smargon_y: mount
# smargon_chi: mount
# smargon_phi: mount
xrf_pos: out
data_collection:
allow_modifiers: true
bl_pos: out
bl_bright: 'off'
bs_pos: in
bs_z: safe
coll_y: in
cryo_pos: in
det_cov: 'open'
diag_y: out
fl_bright: 'on'
aerotech_x: in
aerotech_y: work
aerotech_z: work
# aerotech_u: mount
# smargon_x: mount
# smargon_y: mount
# smargon_chi: mount
# smargon_phi: mount
xrf_pos: out
DC_XRF:
allow_modifiers: true
# bl_pos: out
bl_bright: 'off'
bs_pos: in
bs_z: safe
coll_y: in
cryo_pos: in
det_cov: 'close'
diag_y: out
fl_bright: 'on'
aerotech_x: in
aerotech_y: work
aerotech_z: work
aerotech_u: mount
# smargon_x: mount
# smargon_y: mount
# smargon_chi: mount
# smargon_phi: mount
xrf_pos: in
manual_sample_exchange:
allow_modifiers: true
bl_pos: out
bl_bright: 'off'
bs_pos: out
bs_z: safe
coll_y: park
cryo_pos: in
det_cov: 'close'
diag_y: park
fl_bright: 'off'
aerotech_x: in
aerotech_y: mount
aerotech_z: mount
aerotech_u: mount
smargon_x: mount
smargon_y: mount
smargon_chi: mount
smargon_phi: mount
xrf_pos: out
beam_visualisation:
bl_pos: out
bl_bright: 'off'
bs_pos: in
bs_z: safe
coll_y: out
cryo_pos: out
det_cov: 'close'
diag_y: scint
fl_bright: 'off'
aerotech_x: out
aerotech_y: mount
aerotech_z: mount
aerotech_u: mount
smargon_x: mount
smargon_y: mount
smargon_chi: mount
smargon_phi: mount
xrf_pos: out
flux_measurement:
bl_pos: in
bl_bright: 'off'
bs_pos: in
bs_z: safe
coll_y: out
cryo_pos: out
det_cov: 'close'
diag_y: i1
fl_bright: 'off'
aerotech_x: out
aerotech_y: mount
aerotech_z: mount
aerotech_u: mount
smargon_x: mount
smargon_y: mount
smargon_chi: mount
smargon_phi: mount
xrf_pos: out
beamstop_alignment:
bl_pos: out
bl_bright: 'off'
bs_pos: in
bs_z: samp
coll_y: out
cryo_pos: out
det_cov: 'close'
diag_y: out
fl_bright: 'on'
aerotech_x: out
aerotech_y: mount
aerotech_z: mount
aerotech_u: mount
smargon_x: mount
smargon_y: mount
smargon_chi: mount
smargon_phi: mount
xrf_pos: out
maintenance:
allow_modifiers: true
bl_pos: out
bl_bright: 'off'
bs_pos: out
bs_z: safe
coll_y: park
cryo_pos: in
det_cov: 'close'
diag_y: park
fl_bright: 'off'
aerotech_x: out
aerotech_y: mount
aerotech_z: mount
aerotech_u: mount
smargon_x: mount
smargon_y: mount
smargon_chi: mount
smargon_phi: mount
xrf_pos: out
xtal_snapshot:
allow_modifiers: true
bl_pos: in
bl_bright: 'on'
bs_pos: in
bs_z: safe
coll_y: intermediate
cryo_pos: in
det_cov: 'close'
diag_y: out
fl_bright: 'on'
aerotech_x: in
aerotech_y: work
aerotech_z: work
aerotech_u: mount
smargon_x: mount
smargon_y: mount
smargon_chi: mount
smargon_phi: mount
xrf_pos: out
@@ -0,0 +1,18 @@
aerotech_x:
userParameter: {"type": continuous, "in": 0.0, "out": -10.0, "safe": -100, "tol": 0.5}
aerotech_y:
userParameter: {"type": continuous, "mount": 0.0, "work": 0.01, "tol": 0.002}
aerotech_z:
userParameter: {"type": continuous, "mount": 0.0, "work": 0.02, "tol": 0.01}
aerotech_u:
userParameter: {"type": continuous, "mount": 0.0}
smargon_x:
userParameter: {"type": continuous, "mount": 0.0}
smargon_y:
userParameter: {"type": continuous, "mount": 0.0}
smargon_z:
userParameter: {"type": continuous, "mount": 0.0}
smargon_chi:
userParameter: {"type": continuous, "mount": 0.0}
smargon_phi:
userParameter: {"type": continuous, "mount": 0.0}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,130 @@
bl_bright:
description: Backlight Brightness
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BL:SET', auto_monitor: true}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": continuous, "on": 1.3, "off": 0, “tol”: 0.01}
bl_pos:
description: Backlight Positioner
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BL:POS-SET', auto_monitor: true}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": discrete, "in": 1, "out": 0}
bs_pos:
description: Beamstop Positioner
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BS:POS-SET', auto_monitor: true}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": discrete, "in": 1, "out": 0}
bs_z:
description: Beamstop Z
deviceClass: ophyd_devices.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRZ'}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": continuous, "min": 13, "samp": 15, "work_min": 20, "safe": 23.8, "max_blin": 24, "max_blout": 35}
coll_y:
description: Collimator Y
deviceClass: ophyd_devices.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-COL:TRY'}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": continuous, "in": 40, "out": 20.0, "park": 0,"tol":0.05}
cryo_pos:
description: Cryo Positioner
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-CS:POS-SET', auto_monitor: true}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": discrete, "in": 1, "out": 0}
det_cov:
description: Detector Cover
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-DETCOV:SET', auto_monitor: true}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": discrete, "open": 2, "close": 1}
diag_y:
description: Scintillator/diode Y
deviceClass: ophyd_devices.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SCL:TRY'}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": continuous, "scint": 25, "i1": 29, "out": 5.0,"park": 0,"tol":0.3}
fl_bright:
description: Frontlight Brightness
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-FL:SET', auto_monitor: true}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": continuous, "on": 3.0, "off": 0, “tol”: 0.01}
xrf_pos:
description: XRF Positioner
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-XRF:POS-SET', auto_monitor: true}
onFailure: buffer
enabled: True
readoutPriority: baseline
deviceTags:
- state
readOnly: False
softwareTrigger: false
userParameter: {"type": discrete, "in": 1, "out": 0}
+24 -771
View File
@@ -1,777 +1,30 @@
sls_current:
description: sls current
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'ARS07-DPCT-0100:CURR', auto_monitor: true}
base_config:
- !include ./pxiii-standard-devices.yaml
states_config:
- !include ./pxiii-state-devices.yaml
smargon:
description: REST-based device which connects to Smargopolo
deviceClass: pxiii_bec.devices.smargopolo_smargon.Smargon
deviceConfig: {prefix: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
vg0_press:
description: VG0 pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-VMCC-0000:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
abs_press:
description: Absorber pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-ABS1-VMCC-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
sldi_cenx:
description: FE slit-diaphragm horizontal center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENTERX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizex:
description: FE slit-diaphragm horizontal size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_ceny:
description: FE slit-diaphragm vertical center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENTERY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizey:
description: FE slit-diaphragm vertical size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_try:
description: FE collimating mirror try
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_pitch:
description: FE collimating mirror pitch
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:PITCH'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_bend:
description: FE collimating mirror bend
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:BEND1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
slh_press:
description: OP slit pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-SLH-VMFR-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
slh_trxr:
description: OP slit inner blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
slh_trxw:
description: OP slit outer blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
fi1_try:
description: Beam attenuator motion before mono
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-FI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_theta1:
description: Monochromator pitch 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_diode_top:
description: Top diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:TOP:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_diode_bottom:
description: Bottom diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:BOT:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_theta2:
description: Monochromator pitch 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA2'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_xbpm:
description: XBPM total intensity after monochromator
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_energy:
description: Monochromator energy
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_eoffset:
description: Monochromator energy offset
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:EOFFSET'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm_trx:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm_try:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm:
description: XBPM before secondary source
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-SSBPM1:SumAll:MeanValue_RBV'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
ssslit_trxr:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssslit_trxw:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssslit_tryt:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYT'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssslit_tryb:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYB'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxi1_trx:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxi1_try:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# vfm_tryuw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_tryr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_trydw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
vfm_pitch:
description: KB mirror vertical steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:PITCH'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# hfm_tryur:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_tryw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_trydr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
hfm_pitch:
description: KB mirror horizontal steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:PITCH'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# xbox_xbpm:
# description: Exposure box XBPM
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig: {read_pv: 'X06DA-ES-XBBPM1:SumAll:MeanValue_RBV'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: true
# softwareTrigger: false
xbox_fil1:
description: Exposure box filter wheel 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI1:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil2:
description: Exposure box filter wheel 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI2:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil3:
description: Exposure box filter wheel 3
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI3:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil4:
description: Exposure box filter wheel 4
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI4:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_diode:
description: Exposure box diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DI1:READOUT'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
gonpos:
description: Sample sensor distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-USER1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
gonvalid:
description: Sample in valid distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-CMP1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
samzoom:
description: Sample microscope zoom
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SAMCAM:ZOOM'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samcam:
description: Sample camera aggregate device
deviceClass: pxiii_bec.devices.SamCamDetector
deviceConfig: {prefix: 'X06DA-SAMCAM:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samstream:
description: Sample camera ZMQ stream
deviceClass: pxiii_bec.devices.StdDaqPreviewDetector
deviceConfig:
url: 'tcp://129.129.110.12:9089'
deviceTags:
- detector
enabled: true
readoutPriority: async
readOnly: false
softwareTrigger: false
# samimg:
# description: Sample camera image from EPICS
# deviceClass: pxiii_bec.devices.NDArrayPreview
# deviceConfig:
# prefix: 'X06DA-SAMCAM:image1:'
# deviceTags:
# - detector
# enabled: true
# readoutPriority: async
# readOnly: false
# softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic in-out
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_x:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_y:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_z:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
enabled: True
readoutPriority: baseline
readOnly: false
deviceTags:
- smargon
- motors
readOnly: false
softwareTrigger: false
bstop_diode:
description: Beamstop diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-BS:READOUT', auto_monitor: true}
aerotech:
description: REST-based device which connects to AareScan and Aerotech
deviceClass: pxiii_bec.devices.aerotech.Aerotech
deviceConfig: {prefix: 'http://mx-x06da-queue-01:5234'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
frontlight:
description: Microscope frontlight
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', kind: 'config', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
backlight:
description: Backlight reflector
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
enabled: True
readoutPriority: baseline
readOnly: false
deviceTags:
- aerotech
- motors
readOnly: false
softwareTrigger: false
gmx:
description: ABR horizontal stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMX', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmy:
description: ABR vertical stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMY', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmz:
description: ABR axial stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMZ', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
omega:
description: ABR rotation stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:OMEGA', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
abr:
description: Aerotech ABR motion system
deviceClass: pxiii_bec.devices.AerotechAbrStage
deviceConfig: {prefix: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
coll_x:
description: Collimator X
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-COL:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
coll_y:
description: Collimator Y
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-COL:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shx:
description: SmarGon X axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shy:
description: SmarGon Y axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shz:
description: SmarGon Z axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 10, high_limit: 22, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
chi:
description: SmarGon CHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 0, high_limit: 40, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
phi:
description: SmarGon PHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_y:
description: Pilatus height
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_z:
description: Pilatus translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
+5 -5
View File
@@ -97,10 +97,10 @@ class AerotechAbrStage(PSIDeviceBase, Device):
# )
# Status flags for all axes
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", auto_monitor=True, kind=Kind.normal)
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", auto_monitor=True, kind=Kind.normal)
gmy_done = Component(EpicsSignalRO, "-DF1:GMY-DONE", auto_monitor=True, kind=Kind.normal)
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", auto_monitor=True, kind=Kind.normal)
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", kind=Kind.normal)
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", kind=Kind.normal)
gmy_done = Component(EpicsSignalRO, "-DF1:GMY-DONE", kind=Kind.normal)
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal)
# For some reason the task interface is called PSO...
scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
@@ -128,7 +128,7 @@ class AerotechAbrStage(PSIDeviceBase, Device):
task2 = Component(EpicsSignalRO, "-AERO:TSK2-DONE", auto_monitor=True)
task3 = Component(EpicsSignalRO, "-AERO:TSK3-DONE", auto_monitor=True)
task4 = Component(EpicsSignalRO, "-AERO:TSK4-DONE", auto_monitor=True)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", auto_monitor=True, kind=Kind.config)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", kind=Kind.config)
def __init__(
self,
+43
View File
@@ -0,0 +1,43 @@
from ophyd import Component as Cpt
from .http import TIMESTAMP_ID, HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
class AerotechController(HttpDeviceController):
_readback_endpoint = "/status"
_target_endpoint = "/position"
def __init__(self, *, prefix, **kwargs):
self._readbacks: dict[str, dict[str, float | bool]] = {}
super().__init__(prefix=prefix, **kwargs)
def put(self, axis: str, val: float):
self._rest_post(body={axis: val})
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
with self._readback_lock:
if axis_id not in self._readbacks or TIMESTAMP_ID not in self._readbacks:
return None
return self._readbacks.get(axis_id)["pos"], self._readbacks.get(TIMESTAMP_ID) # type: ignore
class Aerotech(HttpOphydDevice):
controller_class = AerotechController
x = Cpt(HttpDeviceSignal, axis_identifier="x", tolerance=0.01)
y = Cpt(HttpDeviceSignal, axis_identifier="y", tolerance=0.01)
z = Cpt(HttpDeviceSignal, axis_identifier="z", tolerance=0.01)
u = Cpt(HttpDeviceSignal, axis_identifier="u", tolerance=0.01)
vel_u_deg_s = Cpt(HttpDeviceSignal, axis_identifier="vel_u_deg_s", tolerance=0.01)
def _test():
a = Aerotech(name="aerotech", prefix="http://mx-x06da-queue-01:5234")
a.wait_for_connection()
return a
if __name__ == "__main__":
aerotech = _test()
print(aerotech.read())
aerotech.stop()
+178
View File
@@ -0,0 +1,178 @@
import time
from abc import ABC, abstractmethod
from threading import Event, RLock, Thread
from typing import Any
from ophyd import OphydObject
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.socket import SocketSignal
from requests import Response, Session
TIMESTAMP_ID = "__timestamp"
_POLL_INTERVAL_SLOW = 0.1
class HttpRestError(Exception):
"""Error for rest calls from a HttpRestSignal."""
def __init__(self, resp: Response, *args: object, value: Any | None = None) -> None:
method, url = resp.request.method, resp.request.url
data = f"{str(value)} to " if value is not None else ""
super().__init__(
f"Could not {method} {data}{url}. Code: {resp.status_code}. Reason: {resp.reason}.",
*args,
)
class HttpDeviceController(OphydObject, ABC):
"""Controller to consolidate polling loops and other REST calls for devices which communicate
with HTTP REST interfaces"""
_readback_endpoint: str
_target_endpoint: str
def __init__(self, *, prefix, **kwargs):
self._readbacks: dict
self._session = Session()
self._prefix = prefix
self._targets = {}
self._signal_registry: set[str] = set()
self._readback_poll_interval: float = _POLL_INTERVAL_SLOW
super().__init__(**kwargs)
self._setup_readback()
def _setup_readback(self):
self._stop_monitor_readback_event = Event()
self._readback_lock = RLock()
self._monitor_readback_thread = Thread(
target=self._monitor,
args=[
self._readback_endpoint,
self._stop_monitor_readback_event,
self._readback_lock,
self._readbacks,
],
)
def manual_update(self):
self._update_reading(self._readback_endpoint, self._readback_lock, self._readbacks)
def _update_reading(self, endpoint: str, lock: RLock, buffer: dict):
data = self._rest_get(endpoint)
timestamp = time.monotonic()
with lock:
buffer.update(data)
buffer["__timestamp"] = timestamp
def _monitor(self, endpoint: str, event: Event, lock: RLock, buffer: dict):
while not event.is_set():
self._update_reading(endpoint, lock, buffer)
time.sleep(self._readback_poll_interval)
def _clean_monitor(self):
if self._monitor_readback_thread.is_alive():
self._stop_monitor_readback_event.set()
self._monitor_readback_thread.join(timeout=2)
if self._monitor_readback_thread.is_alive():
raise RuntimeError("Failed to clean up Aerotech monitor thread.")
def register(self, axis_id: str):
self._signal_registry.add(axis_id)
def _rest_get(self, endpoint):
resp = self._session.get(self._prefix + endpoint)
if not resp.ok:
raise HttpRestError(resp)
return resp.json()
def _rest_put(self, params: dict | None = None, body: dict | None = None):
resp = self._session.put(self._prefix + self._target_endpoint, params=params, json=body)
if not resp.ok:
raise HttpRestError(resp, value=params)
def _rest_post(self, params: dict | None = None, body: dict | None = None):
resp = self._session.post(self._prefix + self._target_endpoint, params=params, json=body)
if not resp.ok:
raise HttpRestError(resp, value=params)
def start_monitor(self):
"""Start or restart the automonitor thread."""
self._clean_monitor()
self._setup_readback()
self._monitor_readback_thread.start()
def monitor_stopped(self):
return not self._monitor_readback_thread.is_alive()
def put(self, axis: str, val: float):
self._rest_put({axis: val})
@abstractmethod
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
"""Return a tuple (reading, timestamp) if the axis_id exists"""
def stop(self):
# There doesn't appear to be a stop endpoint on the server
# Best effort: set the target to the current position
pass
# TODO: self._rest_put(self._readbacks)
class HttpDeviceSignal(SocketSignal):
"""Ophyd signal which gets and puts to a REST API rather than EPICS PVs, mediated through the Aerotech
Controller"""
def __init__(self, *args, axis_identifier: str, **kwargs):
super().__init__(*args, **kwargs)
controller: HttpDeviceController | None = getattr(self.root, "controller", None)
if controller is None:
raise TypeError("HttpDeviceSignal must be used in a device with a HttpDeviceController")
self._controller = controller
self._axis_id = axis_identifier
self._controller.register(self._axis_id)
def _socket_get(self): # type: ignore
self._readback, self.metadata["timestamp"] = self._controller.get_readback(
self._axis_id
) or (0.0, 0.0)
return self._readback
def _socket_set(self, val: float):
self._controller.put(self._axis_id, val)
def get(self, **kwargs):
if self._controller.monitor_stopped():
self._controller.start_monitor()
return super().get(**kwargs)
class HttpOphydDevice(PSIDeviceBase):
controller_class: type[HttpDeviceController]
def __init__(
self,
*,
name: str,
prefix: str = "",
scan_info=None,
device_manager=None,
**kwargs,
):
self.controller = self.controller_class(prefix=prefix)
super().__init__(
name=name,
prefix=prefix,
scan_info=scan_info,
device_manager=device_manager,
**kwargs,
)
def wait_for_connection(self, **kwargs): # type: ignore
self.controller.start_monitor()
self.controller.manual_update()
return super().wait_for_connection(**kwargs)
def stop(self, *, success: bool = False) -> None:
self.controller.stop()
return super().stop(success=success)
+42
View File
@@ -0,0 +1,42 @@
from ophyd import Component as Cpt
from ophyd_devices import PSIDeviceBase
from .http import HttpDeviceController, HttpDeviceSignal, HttpOphydDevice
_TIMESTAMP_ID = "__timestamp"
_POLL_INTERVAL_SLOW = 0.1
class SmargonController(HttpDeviceController):
"""Controller to consolidate polling loops and other REST calls for the smargon"""
_readback_endpoint = "/readbackSCS"
_target_endpoint = "/targetSCS"
def __init__(self, *, prefix, **kwargs):
self._readbacks: dict[str, float] = {}
super().__init__(prefix=prefix, **kwargs)
def get_readback(self, axis_id: str) -> tuple[float, float] | None:
with self._readback_lock:
if axis_id not in self._readbacks or _TIMESTAMP_ID not in self._readbacks:
return None
return self._readbacks.get(axis_id), self._readbacks.get(_TIMESTAMP_ID) # type: ignore
def put(self, axis: str, val: float):
self._rest_put(params={axis: val})
def stop(self):
# There doesn't appear to be a stop endpoint on the server
# Best effort: set the target to the current position
self._rest_put(params=self._readbacks)
class Smargon(HttpOphydDevice):
controller_class = SmargonController
x = Cpt(HttpDeviceSignal, axis_identifier="SHX", tolerance=0.01)
y = Cpt(HttpDeviceSignal, axis_identifier="SHY", tolerance=0.01)
z = Cpt(HttpDeviceSignal, axis_identifier="SHZ", tolerance=0.01)
phi = Cpt(HttpDeviceSignal, axis_identifier="PHI", tolerance=0.01)
chi = Cpt(HttpDeviceSignal, axis_identifier="CHI", tolerance=0.01)
+6
View File
@@ -0,0 +1,6 @@
# Macros
This directory is intended to store macros which will be loaded automatically when starting BEC.
Macros are small functions to make repetitive tasks easier. Functions defined in python files in this directory will be accessible from the BEC console.
Please do not put any code outside of function definitions here. If you wish for code to be automatically run when starting BEC, see the startup script at pxiii_bec/bec_ipython_client/startup/post_startup.py
For a guide on writing macros, please see: https://bec.readthedocs.io/en/latest/user/command_line_interface.html#how-to-write-a-macro
View File
+194
View File
@@ -0,0 +1,194 @@
"""Planner to move between beamline statesΩ"""
import time
from collections import defaultdict, deque
# from enums import BeamlineState, TemperatureMode
# from matcher import DeviceMatcher, nonzero_is_on
class StateChangePlanner:
"""Moves devices to the correct positions to achieve a given state"""
def __init__(
self,
devices,
states: dict[BeamlineState, dict[str, str]],
allow_modifiers=None,
deps=None,
stage_timeout=5,
debug=False,
):
self.devices = devices
self.states = states
self.allow_modifiers = allow_modifiers or {}
self.deps = deps
self.stage_timeout = stage_timeout
self.debug = debug
self.modifiers = {
TemperatureMode.CRYO: {"cryo_pos": "in"},
TemperatureMode.ROOM_TEMP: {"cryo_pos": "out"},
}
self.matcher = DeviceMatcher()
# Register rules
self.matcher.register("bl_bright", nonzero_is_on)
self.matcher.register("fl_bright", nonzero_is_on)
# self.matcher.register("bs_z", threshold_rule("safe"))
def _merged_state(self, state, modifier):
target = dict(self.states[state])
if modifier:
if isinstance(modifier, str):
modifier = TemperatureMode(modifier)
if self.allow_modifiers.get(state, False):
target.update(self.modifiers[modifier])
return target
def execute_plan(self, stage, state_name):
"""Execute the planning stage"""
statuses = []
moved = []
start = time.time()
# trigger moves in parallel
for dev, pos in stage:
d = self.devices[dev]
if not d.is_at(pos):
# status = d.mv(pos)
status = d.mv(pos)
statuses.append((dev, pos, status))
moved.append((dev, d, pos))
# wait for all to finish
for dev, pos, status in statuses:
remaining = self.stage_timeout - (time.time() - start)
if remaining <= 0:
raise RuntimeError(f"Stage timeout while moving to {state_name.name}")
try:
status.wait(timeout=remaining)
except Exception:
print(f"\nTimeout waiting for {dev} -> {pos}")
# print("Positions:", self.print_positions())
raise
# optional final verification (recommended for beamlines)
for dev, d, pos in moved:
if not self.matcher.matches(dev, d, pos):
raise RuntimeError(
f"{dev} did not reach position '{pos}' while moving to "
f"{state_name.name}. Check motor status in EPICS."
)
print("Stage complete.")
def move_to(self, state_name, modifier=None):
"""Move devices to the correct positions to achieve a given state"""
if isinstance(state_name, str):
state_name = BeamlineState(state_name)
target = self._merged_state(state_name, modifier)
plan = self._plan(target)
print(len(plan), "stages to reach target state")
# print("PLAN:")
# for i, stage in enumerate(plan):
# print(f"Stage {i + 1}: {stage}")
seq = 1
for stage in plan:
print(f"Stage {seq}: {stage}")
self.execute_plan(stage, state_name)
seq += 1
def available_states(self):
"""Return a list of available states"""
return list(self.states.keys())
def get_positions(self):
"""Return current positions of all SE devices"""
return {name: dev.pos for name, dev in self.devices.items()}
def print_positions(self):
"""Return current state of all devices"""
for name, device in self.devices.items():
print(f"{name:10s} : {device.pos:10s} value: {device.actual}")
def diff_states(self, before):
"""Return a dict of {device: (before, after)} pairs for devices that changed state"""
after = self.get_positions()
return {k: (before[k], after[k]) for k in before if before[k] != after[k]}
def current_state(self):
"""Return all current matching BeamlineState and TemperatureMode combinations,
prioritizing non-None modifiers first."""
matches = [] # Store all matching (state, modifier) pairs
for state in self.states:
# Start with prioritized modifiers: Non-None first, then None.
modifiers = list(self.modifiers.keys())
modifiers.append(None) # Add `None` as a fallback after real modifiers.
for modifier in modifiers:
# Combine state and modifier to get full configuration
config = self._merged_state(state, modifier)
# Check if all devices match their expected positions
all_match = True
for d, expected in config.items():
dev = self.devices[d]
if not self.matcher.matches(d, dev, expected):
all_match = False
break
if all_match:
matches.append((state.name, modifier.name if modifier else None))
return matches if matches else None
def is_state(self, state, modifier=None):
"""Check if the current state matches the given state and modifier."""
actual = self.current_state()
if not actual:
return False
if modifier is None:
# match any modifier
return any(s == state.name for s, _ in actual)
return (state.name, modifier.name) in actual
def _plan(self, target):
graph = defaultdict(set)
indeg = defaultdict(int)
nodes = set()
for dev, pos in target.items():
node = (dev, pos)
nodes.add(node)
for dep in self.deps.get(node, []):
graph[dep].add(node)
indeg[node] += 1
nodes.add(dep)
q = deque(n for n in nodes if indeg[n] == 0)
stages = []
while q:
stage = list(q)
stages.append(stage)
q.clear()
for n in stage:
for m in graph[n]:
indeg[m] -= 1
if indeg[m] == 0:
q.append(m)
if sum(len(s) for s in stages) != len(nodes):
raise RuntimeError("Circular dependency in state dependencies")
return stages
@@ -0,0 +1,21 @@
# from enums import BeamlineState
import yaml
class DefineStatesManager:
@staticmethod
def initialize_states(states_file):
"""
Returns the states and modifiers defined in the specified states file.
"""
with open(states_file, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
states = {}
allow_modifiers = {}
for name, config in cfg["states"].items():
state = BeamlineState(name)
allow_modifiers[state] = config.pop("allow_modifiers", False)
states[state] = config
return states, allow_modifiers
+70
View File
@@ -0,0 +1,70 @@
""" Build the sample environment devices"""
import yaml
# from position_device import PositionDevice
def motor_resolver(bec_name):
candidates = [
bec_name,
bec_name.replace("_", "."),
]
for path in candidates:
try:
obj = dev
for part in path.split("."):
obj = getattr(obj, part)
return obj
except Exception:
pass
raise ValueError(f"Cannot resolve motor for '{bec_name}'")
def build_devices(yaml_file, mock_devices):
""" Build devices from the beamline states yaml"""
state_devices = {}
with open(yaml_file, encoding="utf-8") as f:
data = yaml.safe_load(f)
for bec_name, cfg in data.items():
user = cfg.get("userParameter")
# Skip devices without user parameters
if not user:
continue
tol = user.get("tol", 0.1)
positions = {
k: v for k, v in user.items()
if k not in ("type", "tol")
}
allow_arbitrary = (user["type"] == "continuous")
pos_dev = PositionDevice(
bec_name=bec_name,
mot_device = motor_resolver(bec_name),
positions=positions,
tol=tol,
allow_arbitrary=allow_arbitrary,
use_mock=bec_name in mock_devices,
)
state_devices[bec_name] = pos_dev
return state_devices
+357
View File
@@ -0,0 +1,357 @@
"""Utility functions for calculating energy, wavelength, and Bragg angle."""
from dataclasses import dataclass
import numpy as np
# from pxii_parameters import (EnergyDefaults, CamConversion)
@dataclass(frozen=True)
class Constants:
"""Constants used in energy calculations"""
# # Physical Constants from https://physics.nist.gov/cuu/Constants/index.html
ANGSTROM_CONVERSION = 1e10 # Convert meters to angstrom
PLANCK_CONST_EV = 4.135667696e-15 # eV/Hz
SPEED_OF_LIGHT = 299792458 # m/s
# d-spacings
d_spacing = {120: 3.13481, 298: 3.13562}
def speed_of_light_ang():
"""
Calculate the speed of light in angstroms per second.
Returns:
float: The speed of light converted to angstroms per second.
"""
return Constants.SPEED_OF_LIGHT * Constants.ANGSTROM_CONVERSION
def en_wav_factor():
"""
Calculate the energy wavelength factor.
This function computes a constant factor used to calculate energy
values in relation to wavelength by combining Planck's constant,
in eV/Hz, and the speed of light in angstrom.
Returns:
float: The computed energy wavelength factor.
"""
return Constants.PLANCK_CONST_EV * speed_of_light_ang()
# Helper Functions
def convert_to_degrees(angle_mrad: float) -> float:
"""
Convert an angle from milliradians to degrees.
Args:
angle_mrad: The angle value in milliradians.
Returns:
The angle converted into degrees as a float.
"""
return np.rad2deg(angle_mrad / 1000)
def create_conversion_result(
energy_ev: float, wavelength: float, bragg_angle_mrad: float
) -> dict:
"""
Creates a dictionary containing converted values of energy and angles.
This function takes the energy in electron-volts, the wavelength,
and the Bragg angle in milliradians as input. It computes and
returns a dictionary containing the energy in both electron-volts
and kiloelectron-volts, the wavelength, the Bragg angle in milliradians,
and the Bragg angle converted to degrees.
Args:
energy_ev: Energy value in electron-volts.
wavelength: Wavelength value.
bragg_angle_mrad: Bragg angle in milliradians.
Returns:
dict: A dictionary containing the following keys:
- "energy_kev": Energy value in kiloelectron-volts.
- "energy_ev": Energy value in electron-volts.
- "wavelength": Wavelength value.
- "bragg_angle_mrad": Bragg angle in milliradians.
- "bragg_angle_deg": Bragg angle in degrees.
"""
return {
"energy_kev": energy_ev / 1000,
"energy_ev": energy_ev,
"wavelength": wavelength,
"bragg_angle_mrad": float(bragg_angle_mrad),
"bragg_angle_deg": float(convert_to_degrees(bragg_angle_mrad)),
}
def print_conversion_result(result: dict) -> None:
"""
Prints the energy-related conversion results to the console.
"""
line = (
f"energy: {result['energy_ev']:.6g} eV, energy: {result['energy_kev']:.6g} keV, "
f"wavelength: {result['wavelength']:.4g} Å, "
f"bragg angle: {result['bragg_angle_mrad']:.5g} mrad, {result['bragg_angle_deg']:.4g} deg"
)
print(line)
# Conversion Functions
def calculate_wavelength_from_angle(bragg_angle_mrad: float, temp=120) -> float:
"""
calculate_wavelength_from_angle(bragg_angle_mrad: float) -> float
Arguments:
bragg_angle_mrad: The Bragg angle in milliradians, used to compute the
sine value required for the wavelength calculation.
Returns:
The calculated wavelength as a float value.
"""
d = Constants.d_spacing[temp]
return 2 * d * np.sin(bragg_angle_mrad / 1000)
def calculate_energy_from_wavelength(wavelength: float) -> float:
"""
Calculates the energy of a photon based on its wavelength.
Args:
wavelength: The wavelength of the photon in angstrom.
Returns:
The energy of the photon in eV.
"""
return en_wav_factor() / wavelength
def calculate_wavelength_from_energy(energy_ev: float) -> float:
"""
Calculates the wavelength of a photon from its energy.
Arguments:
energy_ev: float
The energy of the photon in electronvolts (eV).
Returns:
float
The calculated wavelength of the photon in angstrom.
"""
return en_wav_factor() / energy_ev
def calculate_bragg_angle_from_wavelength(wavelength: float, temp=120) -> float:
"""
Calculate the Bragg angle in milliradians for a given wavelength.
Args:
wavelength: The wavelength in angstrom.
Returns:
The Bragg angle in milliradians as a float.
"""
d = Constants.d_spacing[temp]
angle_rad = np.arcsin(wavelength / (2 * d))
return angle_rad * 1000
def convert_input_angle_to_mrad(bragg_angle: float) -> float:
"""
Convert input angle into milliradians (mrad).
This function takes an angle as input and determines its likely unit,
converting it to milliradians (mrad) if necessary. If the input value
is less than 1, it is assumed to be in radians and is converted to
mrad. If the input value falls between predefined minimum and
maximum values for mrad, it is assumed to be in degrees and thus
converted to mrad using the degrees-to-radians conversion factor.
For input values that don't match these scenarios, it assumes
that the input is already in mrad and returns it unchanged.
Arguments:
bragg_angle (float): The input Bragg angle, which can be in
radians, degrees, or milliradians.
Returns:
float: The Bragg angle converted into milliradians (mrad).
"""
if bragg_angle < 1: # Likely the input angle is in radians
return bragg_angle * 1000
if 3 < bragg_angle < 25: # Likely input angle is in degrees
return np.deg2rad(bragg_angle) * 1000
return bragg_angle # Already in mrad
# Core Functions
def validate_energy(energy_ev):
"""
Validates the energy value to ensure it falls within the acceptable range. The function
converts the provided energy from keV to eV if the input value is less than 1/1000 of the
maximum energy value. It then checks whether the energy is within the defined bounds.
If the energy value is outside the acceptable range, the function raises a ValueError.
Args:
energy_ev (float): The energy value in eV or keV to be validated. If this value is
smaller than 1/1000 of the maximum allowed energy (in eV), it will be multiplied
by 1000 to convert it from keV to eV.
Returns:
float: The validated energy value in eV that falls within the acceptable range.
Raises:
ValueError: If the energy value is outside the defined range of
[MIN_ENERGY_EV, MAX_ENERGY_EV].
"""
if energy_ev < EnergyDefaults.max_energy_ev / 1000: # Assuming the input is in keV.
energy_ev *= 1000
if not EnergyDefaults.min_energy_ev <= energy_ev <= EnergyDefaults.max_energy_ev:
raise ValueError(
f"Energy of {energy_ev} eV is outside the valid range "
f"({EnergyDefaults.min_energy_ev} eV to {EnergyDefaults.max_energy_ev} eV)"
)
return energy_ev
def convert_from_bragg(
bragg_angle_mrad: float, temp=120, print_result: bool = False
) -> dict:
"""
Convert the Bragg angle to wavelength and energy, returning the result as a dictionary.
This function converts a given Bragg angle (in milliradians) into the corresponding
wavelength and energy values, and returns them in a dictionary format. The function
also supports optional printing of the calculated results.
Args:
bragg_angle_mrad (float): The Bragg angle in milliradians to be converted.
print_result (bool): Whether to print the conversion result. Defaults to False.
Returns:
dict: A dictionary containing the following keys:
- 'energy_ev': Energy in electronvolts.
- 'wavelength': Wavelength corresponding to the input angle.
- 'bragg_angle_mrad': Input Bragg angle in milliradians.
"""
bragg_angle_mrad = convert_input_angle_to_mrad(bragg_angle_mrad)
wavelength = float(calculate_wavelength_from_angle(bragg_angle_mrad, temp=temp))
energy_ev = float(calculate_energy_from_wavelength(wavelength))
result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad)
if print_result:
print_conversion_result(result)
return result
def convert_from_energy(energy_ev: float, temp=120, print_result: bool = False) -> dict:
"""
Convert energy in electron volts (eV) to wavelength and Bragg angle in milliradians
(mrad). This method validates the given energy, calculates corresponding properties,
and optionally prints the result.
Args:
energy_ev: Energy value in electron volts (float) to be converted.
print_result: Flag indicating whether to print the resulting
conversion details (bool). Defaults to False.
Returns:
A dictionary containing the following key-value pairs:
- "energy_ev" (float): Validated energy in eV.
- "wavelength" (float): Calculated wavelength in meters.
- "bragg_angle_mrad" (float): Calculated Bragg angle in mrad.
"""
energy_ev = validate_energy(energy_ev)
wavelength = calculate_wavelength_from_energy(energy_ev)
bragg_angle_mrad = float(
calculate_bragg_angle_from_wavelength(wavelength, temp=temp)
)
result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad)
if print_result:
print_conversion_result(result)
return result
def convert_from_wavelength(
wavelength: float,
temp: float = 120,
print_result: bool = False,
) -> dict:
"""
Convert a given wavelength value into corresponding energy, Bragg angle, and
generate a result dictionary.
The function processes a wavelength value, checks its validity against a
permitted range, calculates corresponding energy and Bragg angle, and
formats the results into a dictionary. Optionally, the function can print
the result.
Parameters:
wavelength: float
The input wavelength value in Angstroms to be converted. Should
fall within the permitted wavelength range.
print_result: bool
Optional flag indicating whether to print the conversion result.
Default is False.
Returns:
dict
A dictionary containing the energy (electron-volts), wavelength
(Angstroms), and Bragg angle (milliradians). If the wavelength is
outside of the permitted range, returns None.
"""
energy_ev = calculate_energy_from_wavelength(wavelength)
bragg_angle_mrad = float(
calculate_bragg_angle_from_wavelength(wavelength, temp=temp)
)
result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad)
if print_result:
print_conversion_result(result)
return result
def calc_perp_position(
energy_ev: float,
print_result: bool = False,
) -> float:
"""
Calculate the perpendicular motor position based on provided energy in electron-volts (eV).
This function computes the perpendicular motor position using the given energy value in
electron-volts. The calculation is based on the Bragg angle derived from the energy. An optional
parameter allows printing the result during execution.
Parameters:
energy_ev (float): The energy value in electron-volts used for the calculation.
print_result (bool): Flag to determine whether to print the computed perpendicular offset.
Default is False.
Returns:
float: The computed perpendicular position.
Raises:
None
"""
result = convert_from_energy(energy_ev, print_result=False)
bragg_angle_rad = result["bragg_angle_mrad"] / 1000
perp_offset = float(EnergyDefaults.beam_offset / (2 * np.cos(bragg_angle_rad))) - 3
if print_result:
print(f"Perp = {perp_offset: .4f}")
return perp_offset
def calc_scam_microns(pixels, zoom = 1000):
return pixels/(0.5208 * np.exp(0.002586 * zoom))
def calc_scam_microns(pixels, zoom=1000):
"""Convert pixels to microns for the sample camera"""
return float(pixels / (CamConversion.a * np.exp(CamConversion.b * zoom)))
def calc_bsccam_microns(pixels):
"""Convert pixels to microns for the BSC camera"""
return pixels*20
+457
View File
@@ -0,0 +1,457 @@
"""Start of a beamline health checker"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
from datetime import datetime
from bec_lib.device import Signal, Positioner
# -------------------------------------------------------------------
# Status Enum
# -------------------------------------------------------------------
class Status(Enum):
"""Define standard statuses"""
OK = 0
WARNING = 1
ERROR = 2
UNKNOWN = 3
@property
def color(self):
return {
Status.OK: "green",
Status.WARNING: "yellow",
Status.ERROR: "red",
Status.UNKNOWN: "blue",
}[self]
@property
def color_scilog(self):
return {Status.OK: "green", Status.WARNING: "", Status.ERROR: "red", Status.UNKNOWN: ""}[
self
]
# -------------------------------------------------------------------
# Health Result Object
# -------------------------------------------------------------------
@dataclass
class HealthCheckResult:
"""Define the output of the health check"""
name: str
description: str
status: Status
value: Any = None
message: str = ""
category: str = "general"
def __str__(self):
if self.status == Status.OK:
return f"[{self.status.name}] {self.description}"
return f"[{self.status.name}] " f"{self.description}: {self.message}"
def formatted_message(self):
if self.status == Status.OK:
return f"[{self.status.name}] {self.name}"
return f"[{self.status.name}] " f"{self.description}: {self.message}"
# -------------------------------------------------------------------
# Send to SciLog
# -------------------------------------------------------------------
def send_to_scilog(results):
"""Make a scilog entry of the health check"""
counts = {Status.OK: 0, Status.WARNING: 0, Status.ERROR: 0, Status.UNKNOWN: 0}
for result in results:
counts[result.status] += 1
timestamp = datetime.now().strftime("%Y/%m/%d %H:%M")
msg = bec.messaging.scilog.new()
msg.add_text(f"Beamline Health Summary {timestamp}", bold=True)
msg.add_text("\n")
for status, count in counts.items():
msg.add_text(f"{status.name:<10}: {count}", bold=True)
msg.add_text("\n")
for result in results:
msg.add_text(
result.formatted_message(),
# bold=result.status != Status.OK,
color=result.status.color_scilog,
)
msg.add_text("\n")
msg.add_tags(["beamline health check"])
msg.send()
# -------------------------------------------------------------------
# Configuration
# -------------------------------------------------------------------
@dataclass
class BeamlineHealthConfig:
"""Define some rules to check against"""
signal_rules: dict[str, Callable] = field(
default_factory=lambda: {"cam": lambda x: x != 0, "bpm": lambda x: x != 0}
)
motor_tolerances: dict[str, float] = field(
default_factory=lambda: {
# examples
# "mono_theta": 0.001,
# "detector_z": 0.1,
}
)
default_motor_tolerance: float = 0.02
# -------------------------------------------------------------------
# Device Collection
# -------------------------------------------------------------------
def get_devices():
"""Return a list of all the beamline devices"""
return list(dev.items())
# -------------------------------------------------------------------
# Signal Checks
# -------------------------------------------------------------------
def check_signals(devices, config: BeamlineHealthConfig):
"""Check the signal devices"""
results = []
signal_devices = [(name, obj) for name, obj in devices if isinstance(obj, Signal)]
for name, obj in signal_devices:
try:
data = obj.read()
actual = data[name]["value"]
description = obj.description
except Exception as e:
results.append(
HealthCheckResult(
name=name,
description=name,
status=Status.UNKNOWN,
message=f"Failed to read signal: {e}",
category="signals",
)
)
continue
matched = False
for keyword, rule in config.signal_rules.items():
if keyword in name:
matched = True
try:
passed = rule(actual)
except Exception as e:
results.append(
HealthCheckResult(
name=name,
description=name,
status=Status.UNKNOWN,
value=actual,
message=f"Rule evaluation failed: {e}",
category="signals",
)
)
break
if passed:
results.append(
HealthCheckResult(
name=name,
description=description,
status=Status.OK,
value=actual,
category="signals",
)
)
else:
results.append(
HealthCheckResult(
name=name,
description=description,
status=Status.ERROR,
value=actual,
message=f"Signal value {actual} failed validation",
category="signals",
)
)
break
if not matched:
continue
return results
# -------------------------------------------------------------------
# Motor Checks
# -------------------------------------------------------------------
def check_motors(devices, config: BeamlineHealthConfig):
"""Check the standard motor devices"""
results = []
motor_devices = [(name, obj) for name, obj in devices if isinstance(obj, Positioner)]
for name, obj in motor_devices:
try:
data = obj.read()
description = obj.description
actual = data[name]["value"]
error_code = obj.motor_status.get()
move_state = obj.motor_is_moving.get()
except Exception as e:
results.append(
HealthCheckResult(
name=name,
description=name,
status=Status.UNKNOWN,
message=f"Failed to read motor: {e}",
category="motors",
)
)
continue
# -----------------------------------------------------------
# Error state
# -----------------------------------------------------------
if error_code != 0:
results.append(
HealthCheckResult(
name=name,
description=description,
status=Status.ERROR,
value=error_code,
message=f"motor error code: {error_code}",
category="motors",
)
)
continue
# -----------------------------------------------------------
# Moving state
# -----------------------------------------------------------
if move_state != 0:
results.append(
HealthCheckResult(
name=name,
description=description,
status=Status.WARNING,
value=move_state,
message="motor is currently moving",
category="motors",
)
)
continue
# -----------------------------------------------------------
# Setpoint comparison
# -----------------------------------------------------------
sp_key = f"{name}_user_setpoint"
if sp_key in data:
setpoint = data[sp_key]["value"]
diff = abs(actual - setpoint)
tolerance = config.motor_tolerances.get(name, config.default_motor_tolerance)
if diff > tolerance:
results.append(
HealthCheckResult(
name=name,
description=description,
status=Status.WARNING,
value=diff,
message=(
f"Setpoint {setpoint:.5g} differs "
f"from readback {actual:.5g} "
f"by {diff:.4g}"
),
category="motors",
)
)
else:
results.append(
HealthCheckResult(
name=name,
description=description,
status=Status.OK,
value=actual,
category="motors",
)
)
else:
results.append(
HealthCheckResult(
name=name,
description=description,
status=Status.UNKNOWN,
message="No setpoint available",
category="motors",
)
)
return results
# -------------------------------------------------------------------
# Main Check Entry Point
# -------------------------------------------------------------------
def check2(config: BeamlineHealthConfig | None = None):
"""Perform the checks"""
if config is None:
config = BeamlineHealthConfig()
devices = get_devices()
results = []
results.extend(check_signals(devices, config))
results.extend(check_motors(devices, config))
# ---------------------------------------------------------------
# Sort by severity
# ---------------------------------------------------------------
results.sort(key=lambda r: r.status.value)
return results
# -------------------------------------------------------------------
# Summary Printer
# -------------------------------------------------------------------
def summary_text(results):
"""Summarise the results in a text table"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
n_ok = sum(r.status == Status.OK for r in results)
n_warn = sum(r.status == Status.WARNING for r in results)
n_err = sum(r.status == Status.ERROR for r in results)
n_unknown = sum(r.status == Status.UNKNOWN for r in results)
return (
f"==========================================\n"
f"Beamline Health Check at {timestamp}\n"
f"==========================================\n"
f"OK : {n_ok}\n"
f"WARNING : {n_warn}\n"
f"ERROR : {n_err}\n"
f"UNKNOWN : {n_unknown}\n"
"==========================================\n"
)
# return "\n".join(lines)
# -------------------------------------------------------------------
# Filter results
# -------------------------------------------------------------------
def filter_results(results, statuses=None):
"""Filter the results"""
if statuses is None:
return results
return [r for r in results if r.status in statuses]
# -------------------------------------------------------------------
# CLI Entry Point
# -------------------------------------------------------------------
def run_check(show_all=False):
"""Runs the checks and outputs the results"""
results = check2()
print(summary_text(results))
problem_results = filter_results(
results, statuses={Status.WARNING, Status.ERROR, Status.UNKNOWN}
)
send_to_scilog(results)
if not show_all:
results = filter_results(results, statuses={Status.WARNING, Status.ERROR, Status.UNKNOWN})
for result in results:
print(result)
+36
View File
@@ -0,0 +1,36 @@
"""Planner dependencies"""
def planner_deps():
"""Define the dependencies between beamline positions"""
return {
("bs_z", "samp"): [
("aerotech_x", "out"),
("diag_y", "out"),
("coll_y", "out"),
],
("aerotech_x", "in"): [
("diag_y", "out"),
("bs_z", "safe"),
],
("aerotech_x", "out"): [
("diag_y", "out"),
("bs_z", "safe"),
],
("diag_y", "scint"): [
("aerotech_x", "out"),
("bs_z", "safe"),
("cryo_pos", "out"),
],
("diag_y", "i1"): [
("aerotech_x", "out"),
("bs_z", "safe"),
("cryo_pos", "out"),
],
("bs_pos", "out"): [("bs_z", "safe")],
("bs_pos", "in"): [("bs_z", "safe")],
("diag_y", "out"): [("bs_z", "safe")],
("diag_y", "park"): [("bs_z", "safe")],
("coll_y", "out"): [("bs_z", "safe")],
("coll_y", "park"): [("bs_z", "safe")],
("coll_y", "in"): [("bs_z", "safe")],
("coll_y", "intermediate"): [("bs_z", "safe")],
}
+22
View File
@@ -0,0 +1,22 @@
"""
This module manages the initialization of devices."""
# from guards import attach_guards
# from policies import attach_policies
# from build_devices import build_devices
class DeviceManager:
"""Class for building devices and attaching safety guards and policies."""
@staticmethod
def initialize_devices(state_devices_file, rest_devices_file, mock_devices):
"""
Initializes sample environment devices from the specified file.
"""
devices = build_devices(state_devices_file, mock_devices)
rest_devices = build_devices(rest_devices_file, mock_devices)
devices.update(rest_devices)
attach_guards(devices)
attach_policies(devices)
return devices
+22
View File
@@ -0,0 +1,22 @@
"""Enums for beamline states"""
from enum import Enum
class BeamlineState(str, Enum):
"""List of beamline states"""
ROBOT_SAMPLE_EXCHANGE = "robot_sample_exchange"
SAMPLE_ALIGNMENT = "sample_alignment"
DATA_COLLECTION = "data_collection"
DC_XRF = "DC_XRF"
MANUAL_SAMPLE_EXCHANGE = "manual_sample_exchange"
BEAM_VISUALISATION = "beam_visualisation"
FLUX_MEASUREMENT = "flux_measurement"
BEAMSTOP_ALIGNMENT = "beamstop_alignment"
MAINTENANCE = "maintenance"
XTAL_SNAPSHOT = "xtal_snapshot"
class TemperatureMode(str, Enum):
"""List of temperature modes"""
CRYO = "cryo"
ROOM_TEMP = "room_temp"
+95
View File
@@ -0,0 +1,95 @@
"""Setup guards for devices."""
class GuardViolation(Exception):
"""Raised when a guarded move is not allowed."""
class AtPositionGuard:
"""Guard that checks if a device is in a specific position."""
def __init__(self, device, position):
self.device = device
self.pos = position
def check(self):
"""Check if the device is in the specified position."""
if self.device.pos != self.pos:
raise GuardViolation(
f"{self.device.bec_name} must be in the '{self.pos}' position"
)
# print("move allowed")
return True
def requirement(self):
"""Return the requirement for the guard."""
return (self.device.bec_name, self.pos)
class MinMaxGuard:
"""Guard that checks if a device is within a specific range."""
def __init__(self, device, limit_value, direction):
self.device = device
self.limit_value = limit_value
self.direction = direction # direction: 'max' or 'min'
def check(self):
"""Check if the device is within the specified range."""
if self.direction == "less_than":
if not (self.device.actual - self.device.tol) <= self.limit_value:
raise GuardViolation(
f"{self.device.bec_name} must be less than or equal to {self.limit_value} mm"
)
elif self.direction == "more_than":
if not (self.device.actual + self.device.tol) >= self.limit_value:
raise GuardViolation(
f"{self.device.bec_name} must be greater than or equal to {self.limit_value} mm"
)
else:
raise ValueError(
f"Invalid direction '{self.direction}'. Use 'less_than' or 'more_than'."
)
# print("move allowed")
return True
def requirement(self):
"""Return the requirement for the guard."""
# planner cannot handle numeric constraints directly
# return None -> planner ignores
return None
def guards_setup(d):
"""Define guards for devices."""
guards = {}
guards["bs_safe"] = AtPositionGuard(d["bs_z"], position="safe")
guards["bs_max_blin"] = MinMaxGuard(
d["bs_z"], direction="less_than", limit_value=d["bs_z"].positions["max_blin"]
)
guards["bs_work_min"] = MinMaxGuard(
d["bs_z"], direction="more_than", limit_value=d["bs_z"].positions["work_min"]
)
guards["bs_pos_in"] = AtPositionGuard(d["bs_pos"], position="in")
guards["gonx_out"] = MinMaxGuard(
d["aerotech_x"], direction="less_than", limit_value=d["aerotech_x"].positions["out"]
)
guards["gonx_safe"] = AtPositionGuard(d["aerotech_x"], position="safe")
guards["diag_y_out"] = MinMaxGuard(
d["diag_y"], direction="less_than", limit_value=d["diag_y"].positions["out"]
)
guards["coll_y_out"] = MinMaxGuard(
d["coll_y"], direction="less_than", limit_value=d["coll_y"].positions["out"]
)
return guards
def attach_guards(d):
"""Attach guards to devices."""
g = guards_setup(d)
d["diag_y"].guards.append(g["bs_work_min"].check)
d["bl_pos"].guards.append(g["bs_max_blin"].check)
d["bs_pos"].guards.append(g["bs_safe"].check)
d["bs_z"].guards.append(g["bs_pos_in"].check)
d["coll_y"].guards.append(g["bs_work_min"].check)
+97
View File
@@ -0,0 +1,97 @@
"""Initialise sample environment devices and beamline states"""
from dataclasses import dataclass
# from devices_manager import DeviceManager
# from beamline_state_manager import DefineStatesManager
# from dependencies import planner_deps
# from beamline_planner import StateChangePlanner
@dataclass
class Environment:
device_mocks: dict[str, bool] = None
def __post_init__(self):
if self.device_mocks is None:
self.device_mocks = {
"aerotech_x": "mock",
"aerotech_y": "mock",
"aerotech_z": "mock",
"aerotech_u": "mock",
"bl_bright": "mock",
"bl_pos": "mock",
"bs_pos": "mock",
"bs_z": "mock",
"coll_y": "mock",
"cryo_pos": "mock",
"det_cov": "mock",
"diag_y": "mock",
"fl_bright": "mock",
"smargon_x": "mock",
"smargon_y": "mock",
"smargon_z": "mock",
"smargon_chi": "mock",
"smargon_phi": "mock",
"xrf_pos": "mock",
}
mocks = sorted(
name
for name, backend in self.device_mocks.items()
if backend == "mock"
)
reals = sorted(
name
for name, backend in self.device_mocks.items()
if backend == "real"
)
print(f"Mock devices ({len(mocks)}): {mocks}")
print(f"Real devices ({len(reals)}): {reals}")
devdir = "/sls/x06da/config/bec/production/pxiii_bec/pxiii_bec/device_configs/"
state_devices_file: str = devdir + "pxiii-state-devices.yaml"
rest_devices_file: str = devdir + "pxiii-rest-devices.yaml"
states_file: str = devdir + "beamline_states.yaml"
@property
def mock_devices(self):
mock_names = set()
for name, device in self.device_mocks.items():
if device == "mock":
mock_names.add(name)
return mock_names
def init_beamline_environment():
"""
Initializes the beamline with real or mock devices.
"""
env = Environment()
# Initialize devices
device_manager = DeviceManager()
devices = device_manager.initialize_devices(
env.state_devices_file,
env.rest_devices_file,
env.mock_devices
)
# Initialize states
state_manager = DefineStatesManager()
states, allow_modifiers = state_manager.initialize_states(env.states_file)
# Setup dependencies
deps = planner_deps()
# Setup planner
planner = StateChangePlanner(devices, states, allow_modifiers, deps)
print("Initializing beamline state planner")
return devices, planner
+63
View File
@@ -0,0 +1,63 @@
"""Used for checking device positions match current state"""
class DeviceMatcher:
"""Class for checking device positions match current state"""
def __init__(self):
self._rules = {}
def register(self, device_name, func):
"""Register a matching function for a device."""
self._rules[device_name] = func
def matches(self, device_name, device, expected):
"""Return True if device matches expected state."""
if expected is None:
return True # "don't care"
if device_name in self._rules:
return self._rules[device_name](device, expected)
# default fallback
return device.is_at(expected)
def explain(self, device_name, device, expected):
"""Return True if device matches expected state."""
val = device.readback.get()
if device_name in self._rules:
ok = self._rules[device_name](device, expected)
else:
ok = device.is_at(expected)
return ok, val
def nonzero_is_on(device, expected, eps=1e-6):
"""Define that anything > 0 is on"""
val = device.actual
if expected == "off":
return abs(val) < eps
if expected == "on":
return val > eps
return abs(val - expected) < eps
# def threshold_rule(param_name):
# def _rule(device, expected):
# val = device.actual
#
# if expected == param_name:
# threshold = device.position
# return val >= threshold
#
# return device.is_at(expected)
#
# return _rule
#
#
# def within_tolerance(tol):
# def _rule(device, expected):
# val = device.readback.get()
# return abs(val - expected) < tol
#
# return _rule
+252
View File
@@ -0,0 +1,252 @@
"""Get data from an h5 file or BEC history and perform fitting."""
import numpy as np
from lmfit.models import (
GaussianModel,
LorentzianModel,
VoigtModel,
ConstantModel,
LinearModel,
)
from scipy.ndimage import gaussian_filter1d
import h5py
import matplotlib.pyplot as plt
def create_fit_parameters(
deriv: bool = False,
model: str = "Voigt",
baseline: str = "Linear",
smoothing: None = None,
):
"""Store the fit parameters in a dictionary."""
# map input model to lmfit model name
model_mappings = {
"Gaussian": GaussianModel,
"Lorentzian": LorentzianModel,
"Voigt": VoigtModel,
"Constant": ConstantModel,
"Linear": LinearModel,
}
return {
"deriv": deriv,
"model": model_mappings[model],
"baseline": model_mappings[baseline],
"smoothing": smoothing,
}
def get_data_from_h5(signal_name: str = "lu_bpmsum"):
"""Get data from an h5 file."""
with h5py.File("scan_676.h5", "r") as f:
entry = f["entry"]["collection"]
y_data = entry["devices"][signal_name][signal_name]["value"][:]
motor_data = entry["metadata"]["bec"]
motor_name = motor_data["scan_motors"][0].decode()
scan_number = motor_data["scan_number"][()]
x_data = entry["devices"][motor_name][motor_name]["value"][:]
return {
"x_data": x_data,
"y_data": y_data,
"signal_name": signal_name,
"motor_name": motor_name,
"scan_number": str(scan_number),
}
def get_data_from_history(
history_index: int,
signal_name: str = "lu_bpmsum",
):
"""Read data from the BEC history and return the X and Y data as arrays."""
scan = bec.history[history_index]
md = scan.metadata["bec"]
motor_name = md["scan_motors"][0].decode()
scan_number = md["scan_number"]
x_data = scan.devices[motor_name][motor_name].read()["value"]
y_data = scan.devices[signal_name][signal_name].read()["value"]
return {
"signal_name": signal_name,
"x_data": x_data,
"y_data": y_data,
"motor_name": motor_name,
"scan_number": scan_number,
}
def process_data(data, fit_params):
"""
Process the signal data for fitting based on derivative or smoothing.
"""
smoothing, deriv = fit_params["smoothing"], fit_params["deriv"]
signal_name = data["signal_name"]
y_data = data["y_data"]
if deriv:
if smoothing:
y_smooth = gaussian_filter1d(y_data, smoothing)
fitting_data = np.gradient(y_smooth)
signal_name = f"Derivative of smoothed {signal_name}"
else:
fitting_data = np.gradient(y_data)
signal_name = f"Derivative of {signal_name}"
elif smoothing and smoothing > 0.01:
fitting_data = gaussian_filter1d(y_data, smoothing)
signal_name = f"Smoothed {signal_name}"
else:
fitting_data = y_data
updated_data = {
"y_to_fit": fitting_data,
"signal_name": signal_name,
}
data.update(updated_data)
return data
def fit(data, fit_params):
"""Fit a signal to a model and return the fitting results."""
# Create the model
peak_model = fit_params["model"](prefix="peak_")
baseline_model = fit_params["baseline"](prefix="base_")
full_model = peak_model + baseline_model
# Prepare data
processed_data = process_data(data, fit_params)
params = full_model.make_params()
y_min = np.min(processed_data["y_to_fit"])
# Configure baseline parameters
if fit_params["baseline"] == ConstantModel:
params["base_c"].set(value=y_min)
elif fit_params["baseline"] == LinearModel:
params["base_intercept"].set(value=y_min)
params["base_slope"].set(value=0)
# Add peak-specific parameters
params.update(
peak_model.guess(processed_data["y_to_fit"], x=processed_data["x_data"])
)
# Perform the fitting
lmfit_result = full_model.fit(
processed_data["y_to_fit"], params, x=processed_data["x_data"]
)
# Find the X that gives the max Y
max_index = np.argmax(processed_data["y_to_fit"])
x_max = processed_data["x_data"][max_index]
# Generate data for a smoothed fit curve
fit_xdata = np.linspace(np.min(data["x_data"]), np.max(data["x_data"]), 500)
fit_ydata = lmfit_result.eval(x=fit_xdata, params=lmfit_result.params)
# Collect results
return {
"model": fit_params["model"].__name__,
"fwhm": lmfit_result.params["peak_fwhm"].value,
"centre": lmfit_result.best_values["peak_center"],
"height": lmfit_result.params["peak_height"].value,
"chi_sq": lmfit_result.chisqr,
"lmfit_result": lmfit_result,
"x_max": x_max,
"fit_xdata": fit_xdata,
"fit_ydata": fit_ydata,
}
def plot_fitted_data(data, fit_result):
"""Plot the original data and the fitted model."""
plt.plot(data["x_data"], data["y_to_fit"], label="Data")
plt.plot(fit_result['fit_xdata'], fit_result['fit_ydata'], label="Fit")
plt.xlabel(data["motor_name"])
plt.ylabel(data["signal_name"])
plt.title(f"Scan {data['scan_number']}, fitted with {fit_result['model']}")
plt.grid(True)
plt.legend()
plt.show()
def select_bec_window(dock_area_name="Fitting"):
"""Check to see if the fitting results dock is already open and re-create it if not"""
open_docks = bec.gui.windows
if open_docks.get(dock_area_name) is None:
dock_area = bec.gui.new(dock_area_name)
# wf = dock_area.new("Plot").new(bec.gui.available_widgets.Waveform)
wf = dock_area.new(widget='Waveform', object_name='Plot')
text_box = dock_area.new(widget='TextBox', object_name="Results", where="bottom")
else:
wf = bec.gui.Fitting.Plot
text_box = bec.gui.Fitting.Results
return wf, text_box
def plot_live_data_bec(
motor_name,
signal_name,
window_name="Fitting"
):
"""
Plotting live data for motor and signal using BEC.
This function plots live data from a specified motor and signal.
It clears the current plot window, sets its title, labels the axes
with the provided motor and signal names, and initializes live plotting
on the given signal against the motor.
Args:
motor_name (str): The name of the motor to be used as the x-axis.
signal_name (str): The name of the signal to be used as the y-axis.
Returns:
None
"""
wf, text_box = select_bec_window(window_name)
text_box.set_plain_text("Plotting live data")
wf.clear_all()
wf.title = "Scan: Live scan"
wf.x_label = motor_name
wf.y_label = signal_name
wf.plot(device_x=motor_name, device_y=signal_name)
def plot_fitted_data_bec(
data,
fit_result,
):
"""
Plot fitted data and display fitting parameters in the specified window.
This function selects a BEC window and plots the original data along with the
fitted function. Additionally, it displays the fitting results in a text
box within the same window for better visualization of the fit results.
Parameters:
data : dict
Dictionary containing the original dataset, where 'x_data' and 'y_to_fit'
hold the independent variable and the dependent variable, respectively,
'scan_number' represents the scan number, 'motor_name' and 'signal_name'
provide axis labels.
fit_result : dict
Dictionary containing the results of the fit, including parameters such
as 'centre', 'fwhm', 'height', and the fitted model stored under
'lmfit_result', with its 'best_fit' attribute representing the fitted data.
"""
wf, text_box = select_bec_window()
fit_text = (
f"Fit parameters: Centre = {fit_result['centre']:.4f}, "
f"FWHM = {fit_result['fwhm']:.3f}, "
f"Height = {fit_result['height']:.4f}\n"
f"Model = {fit_result['model']}\n"
f"Chi sq = {fit_result['chi_sq']:.3g}"
)
text_box.set_plain_text(fit_text)
wf.clear_all()
wf.title = f"Scan: {data['scan_number']}"
wf.x_label = data["motor_name"]
wf.y_label = data["signal_name"]
wf.plot(x=data["x_data"], y=data["y_to_fit"], label="Data")
wf.plot(x=fit_result["fit_xdata"], y=fit_result["fit_ydata"], label="Fit")
# wf.Fit.set(symbol_size = 0)
wf.get_curve('Fit').set(symbol_size=0)
+321
View File
@@ -0,0 +1,321 @@
"""Use the methods in mx_basics to perform:
1) a go_to_peak scan, that scans a motor, finds the peak position and moves to peak
2) fits data from a bec history file
"""
from dataclasses import dataclass
import numpy as np
# from pxiii_parameters import FitDefaults, BPMScans, MirrorConfig
# from mx_basics import (
# create_fit_parameters,
# get_data_from_history,
# fit,
# plot_fitted_data_bec,
# plot_live_data_bec,
# )
# Method functions
def calculate_step_size(start: float, stop: float, steps: int) -> float:
"""
Provides the function to calculate the step size for dividing a specified range
into a given number of steps.
Args:
start: The starting value of the range.
stop: The stopping value of the range.
steps: The number of steps to divide the range into. Must be at least 1.
Raises:
ValueError: If the steps value is less than 1.
Returns:
The calculated step size as a float, rounded to three decimal places.
"""
if steps < 1:
raise ValueError("Number of steps must be at least 1.")
return round((stop - start) / steps, 3)
def move_to_position(motor_device, motor_name: str, position: float, data: dict):
"""
Function to move a specified motor device to a given position.
The function verifies if the requested position is within the scan range of the
motor device provided. If the position is outside the range, the motor is
moved to the center of its scan range, an error message is raised, and the
operation is halted. If the position is valid, the motor is moved to the
specified position.
Parameters:
motor_device: The motor device to be moved.
motor_name: str
The name of the motor as a string
position: float
The desired position to move the motor to. Position should be within
the scan range of the motor determined by the provided data.
data: dict
A dictionary containing "x_data", which is used to determine the
scan range of the motor.
Raises:
ValueError: Raised if the specified position is outside the valid scan
range determined by "x_data" in the data dictionary. The motor will
return to the center of its scan range in this case.
"""
motor_min = np.min(data["x_data"])
motor_max = np.max(data["x_data"])
motor_centre = (motor_max + motor_min) / 2
if not motor_min <= position <= motor_max:
scans.umv(motor_device, motor_centre, relative=False)
msg = (
f"Position {position: .2f} is outside the scan range of "
f"{motor_min: .2f} to {motor_max: .2f}. "
f"Returning to centre of scan range {motor_centre: .3f}."
)
raise ValueError(msg)
motor_position = round(position, 4)
scans.umv(motor_device, motor_position, relative=False)
print(f"\n Moving {motor_name} to position {motor_position: .3f}")
# @dataclass(frozen=True)
# class FitDefaults:
# """Default values for fitting routines"""
# # Constants for default models, baselines, and parameters
# MODEL = "Voigt"
# BASELINE = "Linear"
# SETTLE_TIME = 0.1
# RELATIVE_MODE = True
def go_to_peak(
motor_device,
signal_device,
start: float,
stop: float,
steps: int,
relative: bool = FitDefaults.RELATIVE_MODE,
plot: bool = True,
settle: float = FitDefaults.SETTLE_TIME,
confirm: bool = True,
gomax: bool = False,
):
"""
Go to the peak of a signal by scanning a motor within a specified range and
identifying the optimal position based on signal peak data.
Parameters:
motor_device: The motor device to be scanned.
signal_device: The signal device to monitor during the scan.
start (float): The starting position of the scan. Ignored if `relative` is True.
stop (float): The ending position of the scan. Ignored if `relative` is True.
steps (int): The number of steps to divide the scan range into.
relative (bool, optional): If True, interpret `start` and `stop` as relative to
the current motor position. Defaults to RELATIVE_MODE constant.
plot (bool, optional): If True, plot the scan data and the fitted results.
Defaults to True.
settle (float, optional): The time in seconds to wait after each step for the
signal to stabilize. Defaults to DEFAULT_SETTLE_TIME constant.
confirm (bool, optional): If True, ask for user confirmation before starting
the scan. Defaults to True.
Raises:
Exception: Raises exceptions potentially raised by dependent functions or
operations such as plotting, fitting, or motor movement.
Returns:
None
"""
motor_name = motor_device.name
signal_name = signal_device.name
# wf.plot(x_name=motor_name, y_name=signal_name)
if plot:
plot_live_data_bec(motor_name, signal_name)
# Validate and calculate step size
step_size = calculate_step_size(start, stop, steps)
# Confirm the scan range
# current_motor_position = motor_device.user_readback.get()
current_motor_position = motor_device.read()[motor_name]["value"]
if confirm:
if relative:
scan_start = current_motor_position + start
scan_end = current_motor_position + stop
print(
f"\nScanning from {scan_start: .6g} to {scan_end: .6g} in "
f"{steps} steps of size {step_size}"
)
print(f"Relative mode = {relative}")
else:
print(
f"\nScanning from {start: .5g} to {stop: .5g} in {steps} steps of size {step_size}"
)
print(f"Relative mode = {relative}")
input("Press Enter to continue...")
# Perform the scan
scan_result = scans.line_scan(
motor_device, start, stop, steps=steps, relative=relative, settling_time=settle
)
motor_data = scan_result.scan.live_data[motor_name][motor_name].val
signal_data = scan_result.scan.live_data[signal_name][signal_name].val
scan_number = "Current"
data = {
"x_data": np.array(motor_data),
"y_data": np.array(signal_data),
"motor_name": motor_name,
"signal_name": signal_name,
"motor_device": motor_device,
"scan_number": scan_number,
}
# Define and fit model to scan data
fit_params = create_fit_parameters(False, FitDefaults.MODEL, FitDefaults.BASELINE)
fit_result = fit(data, fit_params)
# Plot the fitted data if plot = True
if plot:
plot_fitted_data_bec(data, fit_result)
# If gomax is set then move to the maximum value, rather than the fit centre
if gomax:
value = fit_result["x_max"]
print(f"Max position is at {value}")
move_to_position(data["motor_device"], data["motor_name"], fit_result["x_max"], data)
else:
# Safely move the motor to the peak position
move_to_position(data["motor_device"], data["motor_name"], fit_result["centre"], data)
def fit_history(
history_index: int,
signal_name: str,
deriv: bool = False,
model: str = FitDefaults.MODEL,
move_to_peak: bool = False,
):
"""
Retrieve and analyze historical data by fitting a model, optionally moving to
a peak position.
Parameters:
history_index (int): Index of the historical data set to retrieve.
signal_name (str): Name of the signal to fit.
deriv (bool, optional): Whether to include the derivative in the fitting
procedure. Defaults to False.
model (str, optional): Name of the model to use for fitting. Defaults to
DEFAULT_MODEL.
move_to_peak (bool, optional): Whether to move the motor to the peak position
after fitting. Defaults to False.
Raises:
KeyError: If required keys are not found in the retrieved data dictionary.
ValueError: If the fitting process fails or produces invalid results.
Returns:
None
"""
# Retrieve historical data
data = get_data_from_history(history_index, signal_name)
# Define fitting parameters
fit_params = create_fit_parameters(deriv, model, FitDefaults.BASELINE)
# Perform fit and plot the data
fit_result = fit(data, fit_params)
plot_fitted_data_bec(data, fit_result)
# Optionally move the motor to the peak position
if move_to_peak:
move_to_position(data["motor_device"], data["motor_name"], fit_result["centre"], data)
def scan_bpm(bpmname):
"""
Runs a grid scan of a BPM in x and y, and plots each channel
as a heatmap.
Parameters:
bpmname: the name of the bpm to be scanned e.g. "fe"
"""
# Open a dock area and set up the heatmaps
dock_area = bec.gui.new("XBPM_Scan")
wf5 = dock_area.new("Sum").new(bec.gui.available_widgets.Heatmap)
wf1 = dock_area.new("Ch1", relative_to="Sum", position="bottom").new(
bec.gui.available_widgets.Heatmap
)
wf3 = dock_area.new("Ch3", relative_to="Ch1", position="right").new(
bec.gui.available_widgets.Heatmap
)
wf4 = dock_area.new("Ch4", relative_to="Ch3", position="bottom").new(
bec.gui.available_widgets.Heatmap
)
wf2 = dock_area.new("Ch2", relative_to="Ch1", position="bottom").new(
bec.gui.available_widgets.Heatmap
)
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
cfg = getattr(BPMScans, bpmname)
wf1.x_label = cfg["x_name"]
wf1.y_label = cfg["y_name"]
wf1.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z1_name"], color_map="plasma")
wf2.x_label = cfg["x_name"]
wf2.y_label = cfg["y_name"]
wf2.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z2_name"], color_map="plasma")
wf3.x_label = cfg["x_name"]
wf3.y_label = cfg["y_name"]
wf3.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z3_name"], color_map="plasma")
wf4.x_label = cfg["x_name"]
wf4.y_label = cfg["y_name"]
wf4.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z4_name"], color_map="plasma")
wf5.x_label = cfg["x_name"]
wf5.y_label = cfg["y_name"]
wf5.plot(x_name=cfg["x_name"], y_name=cfg["y_name"], z_name=cfg["z5_name"], color_map="plasma")
# Run the scan
x_mot = cfg["x_device"]
y_mot = cfg["y_device"]
# scans.grid_scan(x_mot, -0.5, 0.5, 20, y_mot, -0.5, 0.5, 20,
# exp_time=0.5, relative=False, snaked=True)
def optimise_kb(mirror):
"""
Runs a grid scan of a the upstream and downstream benders,
and plots a heatmap of the sample camera x or y sigma.
Parameters:
mirror: either "hfm" or :vfm"
"""
# Open a dock area and set up the heatmaps
dock_area = bec.gui.new(mirror)
wf1 = dock_area.new("Heatmap").new(bec.gui.available_widgets.Heatmap)
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
cfg = getattr(MirrorConfig, mirror)
wf1.x_label = cfg["bu_name"]
wf1.y_label = cfg["bd_name"]
wf1.plot(x_name=cfg["bu_name"], y_name=cfg["bd_name"], z_name=cfg["z_name"], color_map="plasma")
# Run the scan
x_mot = cfg["x_device"]
y_mot = cfg["y_device"]
# scans.grid_scan(x_mot, -0.02, 0.02, 11, y_mot, -0.02, 0.02, 11,
# exp_time=0.5, relative=True, snaked=True)
+77
View File
@@ -0,0 +1,77 @@
"""Define guard policies for devices in the beamline."""
# from guards import GuardViolation, guards_setup
def is_sample_area_clear_for_beamstop(d):
"""Check if the sample area is clear of diag_y, coll_y, and gonx"""
g = guards_setup(d)
if g["diag_y_out"].check() and g["coll_y_out"].check() and g["gonx_out"].check():
return True
return False
def is_sample_area_clear_for_gonx(d):
"""Check if the sample area is clear of diag_y and bs_z"""
g = guards_setup(d)
if g["diag_y_out"].check() and g["bs_work_min"].check():
return True
return False
def make_aerotech_x_policy(d):
"""Create the policy for aerotech_x"""
def aerotech_x_policy(target):
cfg = d["aerotech_x"].positions
if target >= cfg["out"] and not is_sample_area_clear_for_gonx(d):
raise GuardViolation("Sample area is not clear")
return aerotech_x_policy
def make_bs_z_policy(d):
"""Create the policy for bs_z"""
def bs_z_policy(target):
"""Checks that the target position is within limits"""
cfg = d["bs_z"].positions
# Lower bound
if target < cfg["work_min"] and not is_sample_area_clear_for_beamstop(d):
raise GuardViolation("Sample area is not clear")
if target < cfg["min"]:
raise GuardViolation(
f"Requested beamstop Z {target} is below recommended minimum {cfg['min']}"
)
# Upper bound
if d["bl_pos"].pos == "in" and target > cfg["max_blin"]:
raise GuardViolation(
f"Beamstop Z cannot move beyond {cfg['max_blin']} when backlight is IN"
)
return bs_z_policy
def make_diag_y_policy(d):
"""Create the policy for diag_y"""
def diag_y_policy(target):
cfg = d["diag_y"].positions
# Don't move in if the goniometer is in
if d["aerotech_x"].actual >= d['aerotech_x'].positions['in'] and target > cfg["out"]:
raise GuardViolation(
f"Diagnostic device cannot move beyond {cfg['out']} mm when goniometer is not OUT"
)
# Don't move if cryocooler is in
if d['cryo_pos'].pos == 'in' and target > cfg['out']:
raise GuardViolation(
f"Diagnostic device cannot move beyond {cfg['out']} mm when cryocooler is IN"
)
return diag_y_policy
def attach_policies(d):
"""Attach the policies to the devices"""
d["bs_z"].policy = make_bs_z_policy(d)
d["aerotech_x"].policy = make_aerotech_x_policy(d)
d["diag_y"].policy = make_diag_y_policy(d)
+254
View File
@@ -0,0 +1,254 @@
"""Set up the positioned devices for mock or real motors"""
from dataclasses import dataclass, field
from typing import Callable, List, Dict, Optional, Union
import time
class SimpleStatus:
"""Makes a mock motor return a status"""
def __init__(self, motor, target, delay=0.0, success=True, name=""):
self.motor = motor
self.target = target
self.delay = delay
self._success = success
self.name = name
self._done = False
def wait(self, timeout=None):
start = time.time()
while True:
# simulate motion completion
if not self._done:
if time.time() - start >= self.delay:
if self._success:
self.motor.position = self.target
self._done = True
if self._done:
if not self._success:
raise RuntimeError(f"Motor {self.name} failed")
return True
if timeout is not None and (time.time() - start) > timeout:
raise TimeoutError(f"Timeout waiting for {self.name}")
time.sleep(0.01)
class MotorAdapter:
"""Motor adapter for setting up mock/real motors"""
def move(self, pos: float):
"""Move the motor to the given position"""
raise NotImplementedError
def move_with_status(self, pos: float):
"""Move the motor to the given position with a status"""
raise NotImplementedError
def set_fail(self, value: bool):
"""Put the motor into a failure state"""
raise NotImplementedError
@property
def actual(self) -> float:
"""The actual position of the motor"""
raise NotImplementedError
class MockMotorAdapter(MotorAdapter):
"""Motor adapter for mock motors"""
def __init__(self, name: str):
self._motor = PositionDevice.MockMotor(name)
def move(self, pos: float):
"""Move the motor to the given position"""
self._motor.move(pos)
def move_with_status(self, pos: float):
"""Move the motor to the given position with a status"""
if self._motor.fail:
return SimpleStatus(
motor=self._motor,
target=pos,
delay=self._motor.delay,
success=False,
name=self._motor.name,
)
# don't update position immediately
return SimpleStatus(
motor=self._motor,
target=pos,
delay=self._motor.delay,
success=True,
name=self._motor.name,
)
def set_fail(self, value: bool):
"""Put the motor into a failure state"""
self._motor.fail = value
def set_delay(self, value: float):
self._motor.delay = value
@property
def actual(self) -> float:
"""The actual position of the motor"""
return self._motor.position
class RealMotorAdapter(MotorAdapter):
"""Motor adapter for real motors"""
def __init__(self, mot, name):
self._motor = mot
self._name = name
def move(self, pos: float):
"""Move the motor to the given position"""
scans.umv(self._motor, pos, relative=False)
def move_with_status(self, pos: float):
"""Move the motor to the given position with a status"""
return scans.mv(self._motor, pos, relative=False)
def set_fail(self, value: bool):
"""Put the motor into a failure state"""
raise NotImplementedError
@property
def actual(self) -> float:
"""The actual position of the motor"""
return self._motor.read()[self._name]["value"]
@dataclass
class PositionDevice:
"""Generic device that moves between named or numeric positions"""
bec_name: str
mot_device: any = None
positions: Dict[str, float] = field(default_factory=dict)
tol: float = 0.1
# device_timeout = 3
guards: List[Callable[[], None]] = field(default_factory=list)
policy: Optional[Callable[[float], None]] = None
allow_arbitrary: bool = False
use_mock: bool = True
def __post_init__(self):
if self.use_mock:
self.mot = MockMotorAdapter(self.bec_name)
else:
# real_motor = getattr(dev, self.bec_name)
self.mot = RealMotorAdapter(self.mot_device, self.bec_name)
# self.mot = RealMotorAdapter(real_motor, self.bec_name)
# self.mot = getattr(dev, self.bec_name)
# Normalize position names
self.positions = {k.lower(): v for k, v in self.positions.items()}
def _check_guards(self):
"""Check if guards exist"""
for g in self.guards:
g()
def _resolve_target(self, target: Union[str, float]) -> float:
"""Convert target into a motor position"""
if isinstance(target, str):
name = target.lower()
if name not in self.positions:
raise ValueError(f"Unknown position '{target}'")
return self.positions[name]
if isinstance(target, (float, int)):
if not self.allow_arbitrary:
raise ValueError(f"{self.bec_name} only accepts named positions")
return float(target)
raise TypeError("Target must be str or float")
def move(self, target: Union[str, float]):
"""Move devices"""
pos = self._resolve_target(target)
self._check_guards()
if self.policy:
self.policy(pos)
self.mot.move(pos)
def mv(self, target: Union[str, float]):
"""move devices with a timeout"""
pos = self._resolve_target(target)
self._check_guards()
if self.policy:
self.policy(pos)
# status.wait(self.device_timeout)
return self.mot.move_with_status(pos)
def set_position(self, target: Union[str, float]):
"""Only to be used for testing purposes, bypasses guards"""
pos = self._resolve_target(target)
self.mot.move(pos)
@property
def actual(self) -> float:
"""Return the actual position of the device."""
# return self.mot.read()[self.bec_name]["value"]
return self.mot.actual
@property
def pos(self) -> str:
"""Return the closest matching position"""
for name, pos in self.positions.items():
if abs(self.actual - pos) <= self.tol:
return name
return "unknown"
#
def is_at(self, target: Union[str, float]) -> bool:
"""Return True if the device is at the given position."""
pos = self._resolve_target(target)
return abs(self.actual - pos) <= self.tol
# -------------------------
# Mock Motor Implementation
# -------------------------
class MockMotor:
"""Mock motor implementation"""
def __init__(self, name: str):
self.name = name
self.position = 0.0
self._target = self.position
self.fail = False
self.delay = 0
#
def move(self, pos: float):
"""Move the motor to the given position."""
if self.fail:
return
# raise RuntimeError(f"Motor {self.name} failed")
time.sleep(self.delay)
self.position = pos
+251
View File
@@ -0,0 +1,251 @@
"""Script to change energy at PXIII by setting DCCM motors and mirror stripe
Moving DCCM motors - implemented for dccm_theta1 and dccm_theta2
Mirrors - change of mirror stripe is not yet implemented
Plotting optional
"""
import pandas as pd
import numpy as np
# from mx_methods import go_to_peak
# from pxiii_parameters import EnergyDefaults
# from calculator import (
# validate_energy,
# convert_from_bragg,
# convert_from_energy,
# )
def get_current_energy():
"""
Returns the energy in eV from the current bragg angle.
"""
# current_bragg_angle = dev.dccm_theta1.user_readback.get()
current_bragg_angle = -EnergyDefaults.energy.user_readback.get()
current_energy = convert_from_bragg(current_bragg_angle, print_result=False)[
"energy_ev"
]
return current_energy
# Functions below are common to all beamlines
def calculate_energy_difference(current_energy, target_energy):
"""
Calculate the absolute difference in energy between the current energy level
and the target energy level.
"""
return abs(target_energy - current_energy)
def interpolate_column(energy_ev, x_values, y_values):
"""
Perform interpolation for a specific column of data.
This function uses numpy's interpolation method to perform linear
interpolation. It calculates the interpolated y-values corresponding
to the given energy in electron-volts (energy_ev), based on specified
x-values and y-values.
Parameters:
energy_ev (array-like): Array of energy values in electron-volts
at which interpolation is needed.
x_values (array-like): Array of x-values corresponding to known
data points.
y_values (array-like): Array of y-values corresponding to known
data points.
Returns:
numpy.ndarray: Interpolated y-values computed for the energy_ev
input.
"""
return np.interp(energy_ev, x_values, y_values)
def get_value_from_lut(energy_ev):
"""
Retrieve interpolated values from a Lookup Table (LUT) based on the provided energy
in electron volts (eV).
This function reads a CSV file containing energy lookup data and processes the LUT
to return interpolated values for specified relevant columns. The interpolation is
performed for the provided energy value using the LUT's energy and data values.
Args:
energy_ev (float): The energy value in electron volts for which the interpolated data is
required.
Returns:
dict: A dictionary where the keys are the relevant column names from the LUT, and the values
are the interpolated values as floats.
"""
energy_lookup_data = pd.read_csv(EnergyDefaults.LUT_table)
column_names = energy_lookup_data.columns.tolist()
lut_values = energy_lookup_data.values.astype(float).T
# Filter relevant columns for interpolation
relevant_columns = {"dccm_pitch"}
int_values = {}
for i, col_name in enumerate(column_names):
if col_name in relevant_columns:
int_values[col_name] = float(
interpolate_column(energy_ev, lut_values[0], lut_values[i])
)
return int_values
def get_mirror_stripe(energy_ev):
"""
Determines the mirror stripe material based on the energy level provided.
This function evaluates the given energy level in electron volts (eV) and
identifies the material type that corresponds to the specified thresholds
for silicon, rhodium, and, if applicable, platinum.
Args:
energy_ev (float): Energy level in electron volts, used to determine
the corresponding material type.
Returns:
str: A string indicating the material type corresponding to the provided
energy level. Possible values are "silicon", "rhodium", or "platinum".
"""
if energy_ev <= EnergyDefaults.stripe_thresholds["silicon"]:
return "silicon"
if (
EnergyDefaults.stripe_thresholds["silicon"]
< energy_ev
<= EnergyDefaults.stripe_thresholds["rhodium"]
):
return "rhodium"
return "platinum"
def set_mirror_stripe(energy_ev):
"""
Selects and sets the appropriate mirror stripe based on the given energy value
in electron volts (eV). Prints the selected mirror stripe.
Args:
energy_ev (float): The energy value in electron volts used to determine
the appropriate mirror stripe.
Returns:
None
"""
selected_stripe = get_mirror_stripe(energy_ev)
print(f"Selected mirror stripe: {selected_stripe}")
def mono_pitch_scan(plot=True):
"""Scan the monochromator pitch and move to the peak."""
if plot:
print("Scanning monochromator pitch and moving to peak, with plotting.")
go_to_peak(
EnergyDefaults.mono_pitch,
EnergyDefaults.signals["sig1"],
-EnergyDefaults.pitch_scan["halfwidth"],
EnergyDefaults.pitch_scan["halfwidth"],
steps=EnergyDefaults.pitch_scan["steps"],
relative=True,
settle=0.01,
plot=True,
confirm=False,
)
else:
print("Scanning monochromator pitch and moving to peak, without plotting.")
go_to_peak(
EnergyDefaults.mono_pitch,
EnergyDefaults.signals["sig1"],
-EnergyDefaults.pitch_scan["halfwidth"],
EnergyDefaults.pitch_scan["halfwidth"],
steps=EnergyDefaults.pitch_scan["steps"],
relative=True,
settle=0.01,
plot=False,
confirm=False,
)
def get_dccm_motors_positions(energy_ev):
"""
Retrieve the positions of DCCM motors based on given energy value.
The function returns a dictionary containing all
calculated motor positions.
Arguments:
energy_ev (float): The energy value in electron volts for which the
DCCM motor positions are to be calculated.
Returns:
dict: A dictionary containing the calculated DCCM motor positions
including values retrieved from the lookup table, if applicable.
"""
# dccm_motor_values = get_value_from_lut(energy_ev)
th1_angle = -convert_from_energy(energy_ev, print_result=False)["bragg_angle_deg"]
th2_angle = convert_from_energy(energy_ev, temp=298, print_result=False)["bragg_angle_deg"]
# dccm_motor_values.update({"theta1_angle": th1_angle, "theta2_angle": th2_angle})
dccm_motor_values = {"theta1_angle": th1_angle, "theta2_angle": th2_angle}
return dccm_motor_values
def move_dccm_motors(energy_ev):
"""
Move the DCCM theta1 and theta2 motors to the required positions
for the given energy in eV.
"""
dccm_pos = get_dccm_motors_positions(energy_ev)
print(
f"Moving DCCM theta1: {dccm_pos['theta1_angle']: .5g} deg, theta2: {dccm_pos['theta2_angle']: .5g} deg, "
# f"DCM pitch: {dcm_pos['dcm_pitch']: .5g} mrad, "
)
umv(EnergyDefaults.energy, dccm_pos["theta1_angle"],
EnergyDefaults.mono_pitch, dccm_pos["theta2_angle"])
def bl_energy(energy_ev, plot=True):
"""
Adjusts the beamline's energy to the specified energy in electron volts (eV).
The function validates the target energy, checks the current energy, and makes
adjustments only if the energy difference is significant enough. It performs
necessary operations including changing DCCM motors, updating
the mirror stripe, and scanning to find the optimal DCCM pitch of 2nd crystal..
Args:
energy_ev: Target energy in electron volts to which the beamline should be adjusted.
plot: Boolean flag indicating whether to plot the DCCM pitch scan for finding the peak.
Returns:
None
"""
energy_ev = validate_energy(energy_ev) # Ensure energy is valid.
# Check current energy to avoid unnecessary adjustments.
current_energy = get_current_energy()
energy_diff = calculate_energy_difference(current_energy, energy_ev)
if energy_diff <= EnergyDefaults.min_energy_change:
print(
f"Energy change of {energy_diff:.2f} eV is too small, not changing energy."
)
return
# Step 1: Move and set the DCCM motors.
move_dccm_motors(energy_ev)
# Step 2: Update the mirror stripe.
set_mirror_stripe(energy_ev)
# Step 3: Perform DCCM pitch scan and move to peak.
if plot:
mono_pitch_scan(plot=True)
else:
mono_pitch_scan(plot=False)
+80
View File
@@ -0,0 +1,80 @@
"""File to store beamline parameters and defaults"""
from dataclasses import dataclass
import numpy as np
@dataclass(frozen=True)
class FitDefaults:
"""Default values for fitting routines"""
# Constants for default models, baselines, and parameters
MODEL = "Voigt"
BASELINE = "Linear"
SETTLE_TIME = 0.1
RELATIVE_MODE = True
@dataclass(frozen=True)
class EnergyDefaults:
"""Parameters for PXIII energy changes"""
min_energy_change = 1
min_energy_ev = 4500
max_energy_ev = 15000
signals = {
"sig1": dev.dccm_di_top,
"sig2": dev.dccm_bpmsum,
"sig3": dev.ss_bpmsum,
# "sig4": dev.xbox_xbpm,
}
energy = dev.dccm_theta1
mono_pitch = dev.dccm_theta2
# LUT_table = "luts/energy_lut.csv"
stripe_thresholds = {"silicon": 9000, "rhodium": 40000}
pitch_scan = {"halfwidth": 0.15, "steps": 30}
@dataclass(frozen=True)
class CamConversion:
"""Convert pixels to microns for sam cam"""
a = 0.5208
b = 0.002586
@dataclass(frozen=True)
class BPMScans:
"""Define the names of the motors and bpm channels"""
ss = {
"x_name": dev.ss_bpm_x.name,
"y_name": dev.ss_bpm_y.name,
"z1_name": dev.ss_bpm1.name,
"z2_name": dev.ss_bpm2.name,
"z3_name": dev.ss_bpm3.name,
"z4_name": dev.ss_bpm3.name,
"z5_name": dev.ss_bpmsum.name,
"x_device": dev.ss_bpm_x,
"y_device": dev.ss_bpm_y,
}
# @dataclass(frozen=True)
# class MirrorConfig:
# """Define the names of the mirror channels"""
# hfm = {
# "bu_name": dev.hfm_bu.name,
# "bd_name": dev.hfm_bd.name,
# "z_name": dev.samcam_xsig.name,
# "x_device": dev.hfm_bu,
# "y_device": dev.hfm_bd,
# }
# vfm = {
# "bu_name": dev.vfm_bu.name,
# "bd_name": dev.vfm_bd.name,
# "z_name": dev.samcam_ysig.name,
# "x_device": dev.vfm_bu,
# "y_device": dev.vfm_bd,
# }
@@ -0,0 +1,12 @@
"""
Scan components for pxiii_bec.
The scan components module allows you to define custom components that can be used in your scans.
These components can be used to encapsulate reusable logic, interact with devices, or perform specific actions during the scan lifecycle.
"""
from bec_server.scan_server.scans.scan_components import ScanComponents
class PxiiiBecScanComponents(ScanComponents):
"""Scan components for pxiii_bec."""
@@ -0,0 +1,33 @@
"""
Scan modifier plugin for pxiii_bec.
The scan modifier allows you to modify the scan lifecycle and run custom actions before or after the scan hook or replace the scan hook entirely.
Note that the scan_modifier module must be registered as a plugin in the pyproject.toml file for it to be recognized by the BEC framework and that
there can only be one scan_modifier plugin registered at a time. If you need to run multiple scan modifiers, you can create a single scan
modifier plugin that runs multiple actions in sequence with conditional logic to determine which actions to run based on the scan context.
"""
from bec_server.scan_server.scans.scan_modifier import ScanModifier, scan_hook_impl
class PxiiiBecScanModifier(ScanModifier):
"""
Scan modifier for pxiii_bec.
By inheriting from the ScanModifier base class, you get access to currently running scan (self.scan), the devices (self.dev), the scan info (self.scan_info),
the scan components (self.components) and the scan actions (self.actions).
"""
def __init__(self, **kwargs):
"""Initialize the scan modifier."""
super().__init__(**kwargs)
# Example of running code before the scan stage for a specific scan
# @scan_hook_impl("stage", "before")
# def before_stage(self):
# """Run before the stage hook."""
# self.actions.send_client_info("Custom stage logic executed by ScanModifier.")
# if self.scan_info.scan_name == "example_scan":
# self.dev.samx.set(20)
+4 -3
View File
@@ -16,7 +16,8 @@ def alignment_fit_and_plot(
Args:
history_index (int): scan to fetch, e.g. -1 for the most recent scan
device_name (str): the device for which to get the monitoring data
signal_name (str | None): the signal to plot, if different from the device name.
smoothing_sigma (float): the sigma for the Gaussian smoothing filter
"""
# Fetch scan data from the history
# by default, signal = device name, unless otherwise specified
@@ -29,9 +30,9 @@ def alignment_fit_and_plot(
# Create a plot and a text box to display results
dock_area = bec.gui.new()
wf = dock_area.new().new(bec.gui.available_widgets.Waveform)
wf = dock_area.new(bec.gui.available_widgets.Waveform)
wf.title = f"Scan {md['scan_number']}: {md['scan_name']} of {motor_name}"
text = dock_area.new(position="right").new(widget=bec.gui.available_widgets.TextBox)
text = dock_area.new(bec.gui.available_widgets.TextBox, where="right")
# Calculate some processed data and add everything to the plot
wf.plot(data, label="Raw data")
+23 -15
View File
@@ -5,6 +5,18 @@
import time
def _device_name(device):
return device.name if hasattr(device, "name") else str(device)
def _get_or_create_scan_window(name="CurrentScan"):
window = bec.gui.windows.get(name)
if window is None:
return bec.gui.new(name)
window.delete_all()
return window
def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visual=True, **kwargs):
"""Demo step scan with plotting
@@ -26,22 +38,18 @@ def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visua
scan_end = 0.05 / dev.dccm_energy.user_readback.get()
if visual:
# Get or create scan specific window
window = None
for _, val in bec.gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = bec.gui.new("CurrentScan")
# Get or create scan specific window.
window = _get_or_create_scan_window("CurrentScan")
motor_name = _device_name(motor)
datasource_name = _device_name(datasource)
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=datasource)
plt1.set_x_label(motor)
plt1.set_y_label(datasource)
plt1.add_dap(motor, datasource, dap="LinearModel")
# Draw a waveform plot in the window.
plt1 = window.new(
bec.gui.available_widgets.Waveform, object_name=f"ScanDisplay_{motor_name}"
)
plt1.plot(device_x=motor_name, device_y=datasource_name, dap="LinearModel")
plt1.x_label = motor_name
plt1.y_label = datasource_name
window.show()
print("Handing over to 'scans.line_scan'")
+3 -4
View File
@@ -17,14 +17,13 @@ def scan_theta2(scan_start, scan_end, stepno, exp):
# # Create a plot to display the results
dock_area = bec.gui.new()
wf = dock_area.new().new(bec.gui.available_widgets.Waveform)
wf = dock_area.new(bec.gui.available_widgets.Waveform)
wf.title = f"Scan of DCCM_theta2"
wf.plot(x_name='dccm_theta2', y_name='dccm_xbpm')
wf.add_dap_curve(device_label='dccm_xbpm-dccm_xbpm', dap_name='GaussianModel')
wf.plot(device_x="dccm_theta2", device_y="dccm_xbpm", label="dccm_xbpm-dccm_xbpm")
dap_xbpm = wf.add_dap_curve(device_label="dccm_xbpm-dccm_xbpm", dap_name="GaussianModel")
print(dap_xbpm.dap_params)
+23 -15
View File
@@ -6,6 +6,18 @@ def bl_check_beam():
return True
def _device_name(device):
return device.name if hasattr(device, "name") else str(device)
def _get_or_create_scan_window(name="CurrentScan"):
window = bec.gui.windows.get(name)
if window is None:
return bec.gui.new(name)
window.delete_all()
return window
def ascan(
motor, scan_start, scan_end, steps, exp_time, plot=None, visual=True, relative=False, **kwargs
):
@@ -23,22 +35,18 @@ def ascan(
raise RuntimeError("Beamline is not in ready state")
if visual:
# Get or create scan specific window
window = None
for _, val in bec.gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = bec.gui.new("CurrentScan")
# Get or create scan specific window.
window = _get_or_create_scan_window("CurrentScan")
motor_name = _device_name(motor)
plot_name = _device_name(plot)
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=plot)
plt1.set_x_label(motor)
plt1.set_y_label(plot)
plt1.add_dap(motor, plot, dap="LinearModel")
# Draw a waveform plot in the window.
plt1 = window.new(
bec.gui.available_widgets.Waveform, object_name=f"ScanDisplay_{motor_name}"
)
plt1.plot(device_x=motor_name, device_y=plot_name, dap="LinearModel")
plt1.x_label = motor_name
plt1.y_label = plot_name
window.show()
print("Handing over to 'scans.line_scan'")
+4 -1
View File
@@ -6,7 +6,7 @@ build-backend = "hatchling.build"
name = "pxiii_bec"
version = "0.0.0"
description = "A plugin repository for BEC"
requires-python = ">=3.10"
requires-python = ">=3.11"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
@@ -51,6 +51,9 @@ plugin_file_writer = "pxiii_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "pxiii_bec.scans"
[project.entry-points."bec.scans.scan_modifier"]
plugin_scan_modifier = "pxiii_bec.scans.scan_customization.scan_modifier"
[project.entry-points."bec.scans.metadata_schema"]
plugin_metadata_schema = "pxiii_bec.scans.metadata_schema"