Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d1e035e01e | |||
| d49d9cef33 | |||
|
a62fe77b56
|
|||
| e642cbaae8 | |||
| 37bfc1c2a2 | |||
| 63ded2bf7d | |||
| 0c3e7acea6 | |||
| e724ff4869 | |||
| 2e52f6b274 | |||
| dbda93ee12 |
9
.copier-answers.yml
Normal file
9
.copier-answers.yml
Normal file
@@ -0,0 +1,9 @@
|
||||
# Do not edit this file!
|
||||
# It is needed to track the repo template version, and editing may break things.
|
||||
# This file will be overwritten by copier on template updates.
|
||||
|
||||
_commit: v1.2.8
|
||||
_src_path: https://github.com/bec-project/plugin_copier_template.git
|
||||
make_commit: false
|
||||
project_name: addams_bec
|
||||
widget_plugins_input: []
|
||||
102
.gitea/workflows/ci.yml
Normal file
102
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,102 @@
|
||||
name: CI for addams_bec
|
||||
on:
|
||||
push:
|
||||
pull_request:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
BEC_WIDGETS_BRANCH:
|
||||
description: "Branch of BEC Widgets to install"
|
||||
required: false
|
||||
type: string
|
||||
default: "main"
|
||||
BEC_CORE_BRANCH:
|
||||
description: "Branch of BEC Core to install"
|
||||
required: false
|
||||
type: string
|
||||
default: "main"
|
||||
OPHYD_DEVICES_BRANCH:
|
||||
description: "Branch of Ophyd Devices to install"
|
||||
required: false
|
||||
type: string
|
||||
default: "main"
|
||||
BEC_PLUGIN_REPO_BRANCH:
|
||||
description: "Branch of the BEC Plugin Repository to install"
|
||||
required: false
|
||||
type: string
|
||||
default: "main"
|
||||
PYTHON_VERSION:
|
||||
description: "Python version to use"
|
||||
required: false
|
||||
type: string
|
||||
default: "3.12"
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
QTWEBENGINE_DISABLE_SANDBOX: 1
|
||||
QT_QPA_PLATFORM: "offscreen"
|
||||
|
||||
steps:
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "${{ inputs.PYTHON_VERSION || '3.12' }}"
|
||||
|
||||
- name: Checkout BEC Plugin Repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/addams_bec
|
||||
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
|
||||
path: ./addams_bec
|
||||
|
||||
- name: Lint for merge conflicts from template updates
|
||||
shell: bash
|
||||
# Find all Copier conflicts except this line
|
||||
run: '! grep -r "<<<<<<< before updating" | grep -v "grep -r \"<<<<<<< before updating"'
|
||||
|
||||
- name: Checkout BEC Core
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/bec
|
||||
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
|
||||
path: ./bec
|
||||
|
||||
- name: Checkout Ophyd Devices
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/ophyd_devices
|
||||
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
|
||||
path: ./ophyd_devices
|
||||
|
||||
- name: Checkout BEC Widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/bec_widgets
|
||||
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
|
||||
path: ./bec_widgets
|
||||
|
||||
- name: Install dependencies
|
||||
shell: bash
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
|
||||
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
|
||||
|
||||
- name: Install Python dependencies
|
||||
shell: bash
|
||||
run: |
|
||||
pip install uv
|
||||
uv pip install --system -e ./ophyd_devices
|
||||
uv pip install --system -e ./bec/bec_lib[dev]
|
||||
uv pip install --system -e ./bec/bec_ipython_client
|
||||
uv pip install --system -e ./bec/bec_server[dev]
|
||||
uv pip install --system -e ./bec_widgets[dev,pyside6]
|
||||
uv pip install --system -e ./addams_bec
|
||||
|
||||
- name: Run Pytest with Coverage
|
||||
id: coverage
|
||||
run: pytest --random-order --cov=./addams_bec --cov-config=./addams_bec/pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail ./addams_bec/tests/ || test $? -eq 5
|
||||
62
.gitea/workflows/create_update_pr.yml
Normal file
62
.gitea/workflows/create_update_pr.yml
Normal file
@@ -0,0 +1,62 @@
|
||||
name: Create template upgrade PR for addams_bec
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
create_update_branch_and_pr:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
|
||||
steps:
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install tools
|
||||
run: |
|
||||
pip install copier PySide6
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Perform update
|
||||
run: |
|
||||
git config --global user.email "bec_ci_staging@psi.ch"
|
||||
git config --global user.name "BEC automated CI"
|
||||
|
||||
branch="chore/update-template-$(python -m uuid)"
|
||||
echo "switching to branch $branch"
|
||||
git checkout -b $branch
|
||||
|
||||
echo "Running copier update..."
|
||||
output="$(copier update --trust --defaults --conflict inline 2>&1)"
|
||||
echo "$output"
|
||||
msg="$(printf '%s\n' "$output" | head -n 1)"
|
||||
|
||||
if ! grep -q "make_commit: true" .copier-answers.yml ; then
|
||||
echo "Autocommit not made, committing..."
|
||||
git add -A
|
||||
git commit -a -m "$msg"
|
||||
fi
|
||||
|
||||
if diff-index --quiet HEAD ; then
|
||||
echo "No changes detected"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
git push -u origin $branch
|
||||
curl -X POST "https://gitea.psi.ch/api/v1/repos/${{ gitea.repository }}/pulls" \
|
||||
-H "Authorization: token ${{ secrets.CI_REPO_WRITE }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{
|
||||
\"title\": \"Template: $(echo $msg)\",
|
||||
\"body\": \"This PR was created by Gitea Actions\",
|
||||
\"head\": \"$(echo $branch)\",
|
||||
\"base\": \"main\"
|
||||
}"
|
||||
@@ -1,4 +0,0 @@
|
||||
include:
|
||||
- file: /templates/plugin-repo-template.yml
|
||||
inputs: {name: addams_bec, target: addams_bec}
|
||||
project: bec/awi_utils
|
||||
5
LICENSE
5
LICENSE
@@ -1,6 +1,7 @@
|
||||
|
||||
BSD 3-Clause License
|
||||
|
||||
Copyright (c) 2024, Paul Scherrer Institute
|
||||
Copyright (c) 2025, Paul Scherrer Institute
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
@@ -25,4 +26,4 @@ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
@@ -1,75 +1,57 @@
|
||||
import builtins
|
||||
import collections
|
||||
import functools
|
||||
import json
|
||||
import math
|
||||
import pathlib
|
||||
|
||||
import numpy
|
||||
from bec_ipython_client.main import BECClientPrompt
|
||||
|
||||
from bec_ipython_client.prettytable import PrettyTable
|
||||
|
||||
__all__ = [
|
||||
'setlat',
|
||||
'setlambda',
|
||||
'setmode',
|
||||
'freeze',
|
||||
'unfreeze',
|
||||
'br',
|
||||
'ubr',
|
||||
'mvhkl',
|
||||
'umvhkl',
|
||||
'ca',
|
||||
'wh',
|
||||
'pa',
|
||||
'orientAdd',
|
||||
'orientRemove',
|
||||
'orientShow',
|
||||
'orientFit',
|
||||
'ct'
|
||||
"setlat",
|
||||
"setlambda",
|
||||
"setmode",
|
||||
"freeze",
|
||||
"unfreeze",
|
||||
"br",
|
||||
"ubr",
|
||||
"mvhkl",
|
||||
"umvhkl",
|
||||
"ca",
|
||||
"wh",
|
||||
"pa",
|
||||
"orientAdd",
|
||||
"orientRemove",
|
||||
"orientShow",
|
||||
"orientFit",
|
||||
"ct",
|
||||
]
|
||||
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
class BECClientPromptDiffractometer(BECClientPrompt):
|
||||
|
||||
@property
|
||||
def username(self):
|
||||
"""current username"""
|
||||
if "x04v" in dev:
|
||||
return "x04v"
|
||||
if "x04h" in dev:
|
||||
return "x04h"
|
||||
return "demo"
|
||||
|
||||
bec._ip.prompts = BECClientPromptDiffractometer(ip=bec._ip, username="demo", client=bec._client, status=1)
|
||||
|
||||
|
||||
# check for diffractometer device
|
||||
diffract = None
|
||||
if dev is not None:
|
||||
if 'x04h' in dev:
|
||||
if "x04h" in dev:
|
||||
diffract = dev.x04h
|
||||
elif 'x04v' in dev:
|
||||
elif "x04v" in dev:
|
||||
diffract = dev.x04v
|
||||
|
||||
if diffract is not None:
|
||||
RealPosition = collections.namedtuple('RealPosition', ' '.join(diffract.get_real_positioners()))
|
||||
RealPosition = collections.namedtuple("RealPosition", " ".join(diffract.get_real_positioners()))
|
||||
|
||||
|
||||
def freeze(
|
||||
angle: float | None
|
||||
):
|
||||
def freeze(angle: float | None):
|
||||
"""
|
||||
Freeze the value of the mode dependent angle, so when calculating motor positions
|
||||
corresponding to an arbitrary (H, K, L ), the angle will be reset to the frozen value
|
||||
Freeze the value of the mode dependent angle, so when calculating motor positions
|
||||
corresponding to an arbitrary (H, K, L ), the angle will be reset to the frozen value
|
||||
before the calculation no matter what the current position of the diffractometer.
|
||||
"""
|
||||
diffract.freeze(angle)
|
||||
|
||||
|
||||
def unfreeze():
|
||||
"""
|
||||
Subsequent angle calculations will use whatever the current value of the associated
|
||||
@@ -77,74 +59,69 @@ def unfreeze():
|
||||
"""
|
||||
diffract.unfreeze()
|
||||
|
||||
def setlat(
|
||||
a: float, b: float, c: float, alpha: float, beta: float, gamma: float
|
||||
):
|
||||
|
||||
def setlat(a: float, b: float, c: float, alpha: float, beta: float, gamma: float):
|
||||
"""
|
||||
Set sample lattice parameters
|
||||
"""
|
||||
diffract.set_lattice((a, b, c, alpha, beta, gamma))
|
||||
|
||||
def setlambda(
|
||||
wavelength: float
|
||||
):
|
||||
|
||||
def setlambda(wavelength: float):
|
||||
"""
|
||||
Set the x-ray wavelength (in Angstroms)
|
||||
"""
|
||||
if wavelength <= 0:
|
||||
print('Invalid input: wavelength <=0!')
|
||||
print("Invalid input: wavelength <=0!")
|
||||
return
|
||||
|
||||
current_wavelength = diffract.get_wavelength()
|
||||
if math.isclose(wavelength, current_wavelength):
|
||||
print(f'Still using {current_wavelength} A')
|
||||
print(f"Still using {current_wavelength} A")
|
||||
else:
|
||||
diffract.set_wavelength(wavelength)
|
||||
print(f'Lambda reset from {current_wavelength} to {wavelength} A')
|
||||
print(f"Lambda reset from {current_wavelength} to {wavelength} A")
|
||||
|
||||
def setmode(
|
||||
mode: int
|
||||
):
|
||||
|
||||
def setmode(mode: int):
|
||||
"""
|
||||
Set the geometry mode
|
||||
"""
|
||||
if mode < 0 or mode > 2:
|
||||
print('Valid mode is from 0 to 2')
|
||||
print("Valid mode is from 0 to 2")
|
||||
return
|
||||
|
||||
current_mode = diffract.get_mode()
|
||||
if mode == current_mode:
|
||||
print(f'Still using mode {current_mode}')
|
||||
print(f"Still using mode {current_mode}")
|
||||
else:
|
||||
diffract.set_mode(mode)
|
||||
print(f'Mode reset from {current_mode} to {mode}')
|
||||
print(f"Mode reset from {current_mode} to {mode}")
|
||||
|
||||
def mvhkl(
|
||||
h: float, k: float, l: float, auto=False
|
||||
):
|
||||
|
||||
def mvhkl(h: float, k: float, l: float, auto=False):
|
||||
"""
|
||||
Move to the reciprocol space coordinates
|
||||
"""
|
||||
try:
|
||||
angles = diffract.forward(h, k, l)[:-2]
|
||||
except Exception as exc:
|
||||
print(f'{h} {k} {l} is not obtainable: {exc}')
|
||||
print(f"{h} {k} {l} is not obtainable: {exc}")
|
||||
return
|
||||
|
||||
if not auto:
|
||||
for axis, current, target in zip(RealPosition._fields, _currentPosition(), angles):
|
||||
print('%7s = %9.4f --> %9.4f' % (axis, current, target))
|
||||
print("%7s = %9.4f --> %9.4f" % (axis, current, target))
|
||||
|
||||
answer = input('Move to these values? [Y/n]: ')
|
||||
if answer.startswith(('N', 'n')):
|
||||
print('Move abandoned.')
|
||||
answer = input("Move to these values? [Y/n]: ")
|
||||
if answer.startswith(("N", "n")):
|
||||
print("Move abandoned.")
|
||||
return
|
||||
|
||||
|
||||
br(h, k, l)
|
||||
|
||||
def br(
|
||||
h: float, k: float, l: float
|
||||
):
|
||||
|
||||
def br(h: float, k: float, l: float):
|
||||
"""
|
||||
Move to the reciprocol space coordinates
|
||||
"""
|
||||
@@ -156,9 +133,8 @@ def br(
|
||||
|
||||
scans.mv(*args, relative=False)
|
||||
|
||||
def ubr(
|
||||
h: float, k: float, l: float
|
||||
):
|
||||
|
||||
def ubr(h: float, k: float, l: float):
|
||||
"""
|
||||
Move to the reciprocol space coordinates with updates
|
||||
"""
|
||||
@@ -170,44 +146,43 @@ def ubr(
|
||||
|
||||
scans.umv(*args, relative=False)
|
||||
|
||||
def umvhkl(
|
||||
h: float, k: float, l: float, auto=False
|
||||
):
|
||||
|
||||
def umvhkl(h: float, k: float, l: float, auto=False):
|
||||
"""
|
||||
Move to the reciprocol space coordinates with updates
|
||||
"""
|
||||
try:
|
||||
angles = diffract.forward(h, k, l)[:-2]
|
||||
except Exception as exc:
|
||||
print(f'{h} {k} {l} is not obtainable: {exc}')
|
||||
print(f"{h} {k} {l} is not obtainable: {exc}")
|
||||
return
|
||||
|
||||
if not auto:
|
||||
for axis, current, target in zip(RealPosition._fields, _currentPosition(), angles):
|
||||
print('%7s = %9.4f --> %9.4f' % (axis, current, target))
|
||||
print("%7s = %9.4f --> %9.4f" % (axis, current, target))
|
||||
|
||||
answer = input('Move to these values? [Y/n]: ')
|
||||
if answer.startswith(('N', 'n')):
|
||||
print('Move abandoned.')
|
||||
answer = input("Move to these values? [Y/n]: ")
|
||||
if answer.startswith(("N", "n")):
|
||||
print("Move abandoned.")
|
||||
return
|
||||
|
||||
|
||||
ubr(h, k, l)
|
||||
|
||||
def ca(
|
||||
h: float, k: float, l: float
|
||||
):
|
||||
|
||||
def ca(h: float, k: float, l: float):
|
||||
"""
|
||||
Calculate angle positions for a given point in reciprocol space
|
||||
"""
|
||||
angles = diffract.forward(h, k, l)
|
||||
print("\nCalculated positions:\n")
|
||||
print(f'H K L = {h} {k} {l}')
|
||||
print('BetaIn = %.5f BetaOut = %.5f' %(angles[-2], angles[-1]))
|
||||
print('Lambda = %.3f' % diffract.get_wavelength())
|
||||
print(f"H K L = {h} {k} {l}")
|
||||
print("BetaIn = %.5f BetaOut = %.5f" % (angles[-2], angles[-1]))
|
||||
print("Lambda = %.3f" % diffract.get_wavelength())
|
||||
|
||||
print()
|
||||
_showAngles(angles[:-2])
|
||||
|
||||
|
||||
def wh():
|
||||
"""
|
||||
Show where principal axes and reciprocal space
|
||||
@@ -218,41 +193,45 @@ def wh():
|
||||
betaIn = diffract.betaIn.position
|
||||
betaOut = diffract.betaOut.position
|
||||
|
||||
print(f'H K L = {h:.4f} {k:.4f} {l:.4f}')
|
||||
print('BetaIn = %.5f BetaOut = %.5f' %(betaIn, betaOut))
|
||||
print('Lambda = %.3f' % diffract.get_wavelength())
|
||||
print(f"H K L = {h:.4f} {k:.4f} {l:.4f}")
|
||||
print("BetaIn = %.5f BetaOut = %.5f" % (betaIn, betaOut))
|
||||
print("Lambda = %.3f" % diffract.get_wavelength())
|
||||
|
||||
print()
|
||||
_showAngles()
|
||||
|
||||
|
||||
def pa():
|
||||
"""
|
||||
Show geometry parameters
|
||||
"""
|
||||
if diffract.name == 'x04v':
|
||||
print('x04v (Newport Microcontrols 2+3 at SLS) vertical geometry')
|
||||
elif diffract.name == 'x04h':
|
||||
print('x04h (Newport Microcontrols 2+3 at SLS) horizontal geometry')
|
||||
if diffract.name == "x04v":
|
||||
print("x04v (Newport Microcontrols 2+3 at SLS) vertical geometry")
|
||||
elif diffract.name == "x04h":
|
||||
print("x04h (Newport Microcontrols 2+3 at SLS) horizontal geometry")
|
||||
|
||||
match mode := diffract.get_mode():
|
||||
case 0:
|
||||
print(f' BetaIn Fixed (mode {mode})')
|
||||
print(f" BetaIn Fixed (mode {mode})")
|
||||
case 1:
|
||||
print(f' BetaOut Fixed (mode {mode})')
|
||||
print(f" BetaOut Fixed (mode {mode})")
|
||||
case 2:
|
||||
print(f' BetaIn equals BetaOut (mode {mode})')
|
||||
|
||||
print(f" BetaIn equals BetaOut (mode {mode})")
|
||||
|
||||
if beta_frozen := diffract.get_frozen():
|
||||
print(f' Frozen coordinate: {beta_frozen}')
|
||||
print(f" Frozen coordinate: {beta_frozen}")
|
||||
|
||||
|
||||
def orientShow():
|
||||
"""
|
||||
Display list of measured reflections
|
||||
"""
|
||||
print('\n(Using lattice constants:)')
|
||||
print("\n(Using lattice constants:)")
|
||||
lattice = diffract.get_lattice()
|
||||
print('a = %.4g, b = %.4g, b = %.4g, alpha = %.6g, beta = %.6g, gamma = %.6g' %
|
||||
(lattice[0], lattice[1], lattice[2], lattice[3], lattice[4], lattice[5]))
|
||||
print(
|
||||
"a = %.4g, b = %.4g, b = %.4g, alpha = %.6g, beta = %.6g, gamma = %.6g"
|
||||
% (lattice[0], lattice[1], lattice[2], lattice[3], lattice[4], lattice[5])
|
||||
)
|
||||
|
||||
print("\n------------------------------------------------\n")
|
||||
|
||||
@@ -263,17 +242,15 @@ def orientShow():
|
||||
|
||||
_showUB()
|
||||
|
||||
def orientRemove(
|
||||
h: float, k: float, l: float
|
||||
):
|
||||
|
||||
def orientRemove(h: float, k: float, l: float):
|
||||
"""
|
||||
Remove a measured reflection from the list
|
||||
"""
|
||||
diffract.remove_reflection(h, k, l)
|
||||
|
||||
def orientAdd(
|
||||
h: float, k: float, l: float, *args
|
||||
):
|
||||
|
||||
def orientAdd(h: float, k: float, l: float, *args):
|
||||
"""
|
||||
Add a reflection to the list of measured reflections
|
||||
"""
|
||||
@@ -282,63 +259,67 @@ def orientAdd(
|
||||
response = diffract.real_position
|
||||
# The original return value is of namedtuple type,
|
||||
# which gets serialized to a dictionary by the device server.
|
||||
angles = tuple(response['values'][axis] for axis in response['fields'] if axis != 'nu')
|
||||
angles = tuple(response["values"][axis] for axis in response["fields"] if axis != "nu")
|
||||
|
||||
if len(angles) < 4:
|
||||
print('Please specify all angles')
|
||||
print("Please specify all angles")
|
||||
return
|
||||
|
||||
diffract.add_reflection(h, k, l, angles)
|
||||
|
||||
def orientSave(
|
||||
filename: str
|
||||
):
|
||||
|
||||
def orientSave(filename: str):
|
||||
"""
|
||||
Save the current reflections
|
||||
"""
|
||||
configuration = {}
|
||||
configuration['geometry'] = diffract.name
|
||||
configuration['wavelength'] = diffract.get_wavelength()
|
||||
configuration['lattice'] = diffract.get_lattice()
|
||||
configuration['reflections'] = diffract.get_reflections()
|
||||
|
||||
configuration["geometry"] = diffract.name
|
||||
configuration["wavelength"] = diffract.get_wavelength()
|
||||
configuration["lattice"] = diffract.get_lattice()
|
||||
configuration["reflections"] = diffract.get_reflections()
|
||||
|
||||
filepath = pathlib.Path(filename)
|
||||
if filepath.exists():
|
||||
answer = input('File "%s" already exists. Do you want to overwrite it? [y/N]: ' %(filepath.absolute()))
|
||||
if not answer.startswith(('Y', 'y')):
|
||||
answer = input(
|
||||
'File "%s" already exists. Do you want to overwrite it? [y/N]: ' % (filepath.absolute())
|
||||
)
|
||||
if not answer.startswith(("Y", "y")):
|
||||
return
|
||||
|
||||
with open(filepath, 'w') as f:
|
||||
with open(filepath, "w") as f:
|
||||
json.dump(configuration, f)
|
||||
|
||||
def orientLoad(
|
||||
filename: str
|
||||
):
|
||||
|
||||
def orientLoad(filename: str):
|
||||
"""
|
||||
Load relfections from file
|
||||
"""
|
||||
with open(filename, 'r') as f:
|
||||
with open(filename, "r") as f:
|
||||
configuration = json.load(f)
|
||||
if configuration['geometry'] != diffract.name:
|
||||
print('Saved orientation is for a different geometry "%s", current is "%s".' % configuration['geometry'], diffract.name)
|
||||
if configuration["geometry"] != diffract.name:
|
||||
print(
|
||||
'Saved orientation is for a different geometry "%s", current is "%s".'
|
||||
% configuration["geometry"],
|
||||
diffract.name,
|
||||
)
|
||||
return
|
||||
|
||||
# save current wavelength, lattice and reflections
|
||||
saved_wavelength = diffract.get_wavelength()
|
||||
saved_lattice = diffract.get_lattice()
|
||||
saved_reflections = diffract.get_reflections()
|
||||
|
||||
|
||||
try:
|
||||
diffract.set_lattice(configuration['lattice'])
|
||||
|
||||
diffract.set_lattice(configuration["lattice"])
|
||||
|
||||
diffract.clear_reflections()
|
||||
for reflection in configuration['reflections']:
|
||||
for reflection in configuration["reflections"]:
|
||||
diffract.add_reflection(*reflection)
|
||||
|
||||
_showReflections(configuration['reflections'])
|
||||
_showReflections(configuration["reflections"])
|
||||
print("\n------------------------------------------------\n")
|
||||
# set wavelength temporarily for orientFit and restore later
|
||||
diffract.set_wavelength(configuration['wavelength'])
|
||||
diffract.set_wavelength(configuration["wavelength"])
|
||||
orientFit()
|
||||
except Exception as exc:
|
||||
# restore saved lattice and reflections
|
||||
@@ -352,6 +333,7 @@ def orientLoad(
|
||||
# restore wavelength
|
||||
diffract.set_wavelength(saved_wavelength)
|
||||
|
||||
|
||||
def orientFit():
|
||||
"""
|
||||
Fit UB matrix from given reflections
|
||||
@@ -364,19 +346,22 @@ def orientFit():
|
||||
diffract.compute_UB()
|
||||
_showUB()
|
||||
|
||||
|
||||
def ct(exp_time: float):
|
||||
"""
|
||||
Acquire all detectors
|
||||
"""
|
||||
scans.acquire(exp_time=exp_time)
|
||||
|
||||
|
||||
def _showUB():
|
||||
UB = diffract.get_UB()
|
||||
|
||||
print('Orientation matrix by row:')
|
||||
print(' Row 1: %8.5f %8.5f %8.5f' % (UB[0,0], UB[0,1], UB[0,2]))
|
||||
print(' Row 2: %8.5f %8.5f %8.5f' % (UB[1,0], UB[1,1], UB[1,2]))
|
||||
print(' Row 3: %8.5f %8.5f %8.5f' % (UB[2,0], UB[2,1], UB[2,2]))
|
||||
|
||||
print("Orientation matrix by row:")
|
||||
print(" Row 1: %8.5f %8.5f %8.5f" % (UB[0, 0], UB[0, 1], UB[0, 2]))
|
||||
print(" Row 2: %8.5f %8.5f %8.5f" % (UB[1, 0], UB[1, 1], UB[1, 2]))
|
||||
print(" Row 3: %8.5f %8.5f %8.5f" % (UB[2, 0], UB[2, 1], UB[2, 2]))
|
||||
|
||||
|
||||
def _showAngles(angles=None):
|
||||
if angles is None:
|
||||
@@ -384,23 +369,25 @@ def _showAngles(angles=None):
|
||||
|
||||
table = PrettyTable(RealPosition._fields, padding=12)
|
||||
print(table.get_header())
|
||||
text = tuple(f'{x:9.4f}' for x in angles)
|
||||
text = tuple(f"{x:9.4f}" for x in angles)
|
||||
print(table.get_row(*text))
|
||||
|
||||
|
||||
def _currentPosition():
|
||||
response = diffract.real_position
|
||||
# The original return value is of namedtuple type,
|
||||
# which gets serialized to a dictionary by the device server.
|
||||
angles = RealPosition(*(response['values'][axis] for axis in response['fields']))
|
||||
angles = RealPosition(*(response["values"][axis] for axis in response["fields"]))
|
||||
return angles
|
||||
|
||||
|
||||
def _showReflections(reflections):
|
||||
print('The defined reflections are:')
|
||||
header = ['h', 'k', 'l'] + diffract.real_position['fields'][:-1]
|
||||
print("The defined reflections are:")
|
||||
header = ["h", "k", "l"] + diffract.real_position["fields"][:-1]
|
||||
table = PrettyTable(header, padding=12)
|
||||
print(' ', table.get_header())
|
||||
print(" ", table.get_header())
|
||||
|
||||
for reflection in reflections:
|
||||
h, k, l, angles = reflection
|
||||
text = [f'{h:9.4f}', f'{k:9.4f}', f'{l:9.4f}'] + [f'{x:9.4f}' for x in angles]
|
||||
print(' ', table.get_row(*text))
|
||||
text = [f"{h:9.4f}", f"{k:9.4f}", f"{l:9.4f}"] + [f"{x:9.4f}" for x in angles]
|
||||
print(" ", table.get_row(*text))
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, List, Tuple
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import DeviceManagerBase as DeviceManager
|
||||
|
||||
|
||||
class EnergyOptimizer:
|
||||
def __init__(self, device_manager: DeviceManager):
|
||||
self.device_manager = device_manager
|
||||
self.strips = {
|
||||
"Si": {"energy_range": [5, 10]},
|
||||
"Rh": {"energy_range": [8, 23]},
|
||||
"Pt": {"energy_range": [20, 40]},
|
||||
}
|
||||
self.overlap_energyies = {"Si": {"Rh": 9}, "Rh": {"Si": 9, "Pt": 22}, "Pt": {"Rh": 22}}
|
||||
|
||||
@staticmethod
|
||||
def get_transition_steps(start_energy: float, target_energy: float) -> List[Tuple[float, str]]:
|
||||
"""
|
||||
Get the required steps to transition from one energy to another.
|
||||
|
||||
Args:
|
||||
start_energy: The starting energy in keV.
|
||||
target_energy: The target energy in keV.
|
||||
|
||||
Returns:
|
||||
A list of tuples containing the energy and the strip name for each step.
|
||||
"""
|
||||
strips = {"Si": (5, 10), "Rh": (8, 23), "Pt": (20, 40)}
|
||||
overlap_energyies = {"Si": {"Rh": 9}, "Rh": {"Si": 9, "Pt": 22}, "Pt": {"Rh": 22}}
|
||||
|
||||
def get_strip(energy: float) -> str:
|
||||
for strip, (low, high) in strips.items():
|
||||
if low <= energy <= high:
|
||||
return strip
|
||||
return ""
|
||||
|
||||
def find_overlap(from_strip: str, to_strip: str) -> float:
|
||||
return overlap_energyies[from_strip][to_strip]
|
||||
|
||||
path = []
|
||||
|
||||
if get_strip(start_energy) == "":
|
||||
raise ValueError("Start energy is out of range for available strips")
|
||||
if not any(low <= target_energy <= high for low, high in strips.values()):
|
||||
raise ValueError("End energy is out of range for available strips")
|
||||
|
||||
current_energy = start_energy
|
||||
|
||||
# TODO: this should be replaced with a readout from the PV
|
||||
current_strip = get_strip(current_energy)
|
||||
|
||||
# if the target energy is covered by the current strip, return the path
|
||||
if strips[current_strip][0] <= target_energy <= strips[current_strip][1]:
|
||||
return [(target_energy, current_strip)]
|
||||
|
||||
target_strip = get_strip(target_energy)
|
||||
|
||||
available_strips = list(strips.keys())
|
||||
current_index = available_strips.index(current_strip)
|
||||
target_index = available_strips.index(target_strip)
|
||||
step = 1 if target_index > current_index else -1
|
||||
|
||||
for i in range(current_index, target_index, step):
|
||||
next_strip = available_strips[i + step]
|
||||
overlap_energy = find_overlap(available_strips[i], next_strip)
|
||||
path.append((overlap_energy, next_strip))
|
||||
current_strip = next_strip
|
||||
|
||||
path.append((target_energy, target_strip))
|
||||
|
||||
return path
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
device_manager = MagicMock()
|
||||
optimizer = EnergyOptimizer(device_manager)
|
||||
steps = optimizer.get_transition_steps(30, 20)
|
||||
print(steps)
|
||||
@@ -34,7 +34,27 @@ to setup the prompts.
|
||||
"""
|
||||
|
||||
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
|
||||
import builtins
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_ipython_client.main import BECIPythonClient
|
||||
|
||||
bec: BECIPythonClient = BECIPythonClient()
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
|
||||
else:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
|
||||
bec.load_high_level_interface("bec_hli")
|
||||
bec.load_high_level_interface("spec_hli")
|
||||
bec.load_high_level_interface("hkl_hli")
|
||||
|
||||
|
||||
if "x04v" in dev:
|
||||
bec._ip.prompts.session_name = "x04v"
|
||||
elif "x04h" in dev:
|
||||
bec._ip.prompts.session_name = "x04h"
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
"""
|
||||
Pre-startup script for BEC client. This script is executed before the BEC client
|
||||
is started. It can be used to add additional command line arguments.
|
||||
is started. It can be used to add additional command line arguments.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from bec_lib.service_config import ServiceConfig
|
||||
|
||||
import addams_bec
|
||||
|
||||
|
||||
def extend_command_line_args(parser):
|
||||
"""
|
||||
@@ -15,9 +19,13 @@ def extend_command_line_args(parser):
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
# def get_config() -> ServiceConfig:
|
||||
# """
|
||||
# Create and return the service configuration.
|
||||
# """
|
||||
# return ServiceConfig(redis={"host": "localhost", "port": 6379})
|
||||
def get_config() -> ServiceConfig:
|
||||
"""
|
||||
Create and return the ServiceConfig for the plugin repository
|
||||
"""
|
||||
deployment_path = os.path.dirname(os.path.dirname(os.path.dirname(addams_bec.__file__)))
|
||||
files = os.listdir(deployment_path)
|
||||
if "bec_config.yaml" in files:
|
||||
return ServiceConfig(config_path=os.path.join(deployment_path, "bec_config.yaml"))
|
||||
else:
|
||||
return ServiceConfig(redis={"host": "localhost", "port": 6379})
|
||||
|
||||
0
addams_bec/bec_widgets/auto_updates/__init__.py
Normal file
0
addams_bec/bec_widgets/auto_updates/__init__.py
Normal file
@@ -1,5 +1,3 @@
|
||||
import os
|
||||
|
||||
from ophyd import (
|
||||
ADComponent as ADCpt,
|
||||
Device,
|
||||
@@ -53,14 +51,6 @@ class Eiger500KSetup(CustomDetectorMixin):
|
||||
self.parent.cam.acquire.put(1, wait=False) # arm
|
||||
|
||||
# file writer
|
||||
# TODO: enable this when new storage path is consistent on bec server and consoles.
|
||||
# self.parent.filepath.set(
|
||||
# self.parent.filewriter.compile_full_filename(f"{self.parent.name}")
|
||||
# ).wait()
|
||||
# file_path, file_name = os.path.split(self.parent.filepath.get())
|
||||
# self.parent.hdf.file_path.put(file_path)
|
||||
# self.parent.hdf.file_name.put(file_name)
|
||||
# self.parent.hdf.file_template.put("%s%s")
|
||||
self.parent.hdf.lazy_open.put(1)
|
||||
self.parent.hdf.num_capture.put(num_points)
|
||||
self.parent.hdf.file_write_mode.put(2) # Stream
|
||||
@@ -109,8 +99,7 @@ class Eiger500KSetup(CustomDetectorMixin):
|
||||
|
||||
# publish timeseries data
|
||||
metadata = self.parent.scaninfo.scan_msg.metadata
|
||||
metadata.update({"async_update": "append", "max_shape": [None, None]})
|
||||
|
||||
metadata.update({"async_update": "append", "num_lines": self.parent.roistat.ts_current_point.get()})
|
||||
msg = messages.DeviceMessage(
|
||||
signals={
|
||||
self.parent.roistat.roi1.name_.get(): {
|
||||
|
||||
@@ -143,11 +143,11 @@ class ProfileMoveAxisXPS(ProfileMoveAxis):
|
||||
|
||||
|
||||
class ProfileMoveXPSX04SA(ProfileMoveControllerXPS):
|
||||
ov = Cpt(ProfileMoveAxisXPS, '', motor='1')
|
||||
xv = Cpt(ProfileMoveAxisXPS, '', motor='2')
|
||||
oh = Cpt(ProfileMoveAxisXPS, '', motor='3')
|
||||
phi = Cpt(ProfileMoveAxisXPS, '', motor='4')
|
||||
nu = Cpt(ProfileMoveAxisXPS, '', motor='5')
|
||||
alp = Cpt(ProfileMoveAxisXPS, '', motor='6')
|
||||
gam = Cpt(ProfileMoveAxisXPS, '', motor='7')
|
||||
delta = Cpt(ProfileMoveAxisXPS, '', motor='8')
|
||||
alp = Cpt(ProfileMoveAxisXPS, '', motor='1')
|
||||
delta = Cpt(ProfileMoveAxisXPS, '', motor='2')
|
||||
gam = Cpt(ProfileMoveAxisXPS, '', motor='3')
|
||||
ov = Cpt(ProfileMoveAxisXPS, '', motor='4')
|
||||
xv = Cpt(ProfileMoveAxisXPS, '', motor='5')
|
||||
phi = Cpt(ProfileMoveAxisXPS, '', motor='6')
|
||||
oh = Cpt(ProfileMoveAxisXPS, '', motor='7')
|
||||
nu = Cpt(ProfileMoveAxisXPS, '', motor='8')
|
||||
|
||||
6
addams_bec/macros/README.md
Normal file
6
addams_bec/macros/README.md
Normal file
@@ -0,0 +1,6 @@
|
||||
# Macros
|
||||
|
||||
This directory is intended to store macros which will be loaded automatically when starting BEC.
|
||||
Macros are small functions to make repetitive tasks easier. Functions defined in python files in this directory will be accessible from the BEC console.
|
||||
Please do not put any code outside of function definitions here. If you wish for code to be automatically run when starting BEC, see the startup script at addams_bec/bec_ipython_client/startup/post_startup.py
|
||||
For a guide on writing macros, please see: https://bec.readthedocs.io/en/latest/user/command_line_interface.html#how-to-write-a-macro
|
||||
0
addams_bec/macros/__init__.py
Normal file
0
addams_bec/macros/__init__.py
Normal file
@@ -1,4 +1,2 @@
|
||||
from .hkl_scan import HklScan
|
||||
from .fly_scan import HklFlyScan
|
||||
from .mesh_scan import HklMeshScan, HklMeshFlyScan
|
||||
from .profilemove_scan import LineProfileMove, GridProfileMove
|
||||
from .hkl_scan import HklScan
|
||||
|
||||
@@ -64,7 +64,6 @@ class HklFlyScan(AsyncFlyScanBase):
|
||||
self.positions.append(position[:-2])
|
||||
|
||||
def scan_core(self):
|
||||
all_motors = self.device_manager.devices[self.controller].axes.get()
|
||||
scan_motors = self.device_manager.devices[self.diffract].real_axes.get()
|
||||
hkls = numpy.linspace(self.start, self.stop, self.points).tolist()
|
||||
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
|
||||
@@ -81,15 +80,11 @@ class HklFlyScan(AsyncFlyScanBase):
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [total_time, total_time])
|
||||
|
||||
for axis_name in all_motors:
|
||||
if axis_name not in scan_motors:
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
|
||||
else:
|
||||
index = scan_motors.index(axis_name)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller,
|
||||
f'{axis_name}.positions.put',
|
||||
(positions[0, index], positions[-1, index]))
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
|
||||
for index, axis_name in enumerate(scan_motors):
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller,
|
||||
f'{axis_name}.positions.put',
|
||||
(positions[0, index], positions[-1, index]))
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
|
||||
|
||||
else:
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', num_points)
|
||||
@@ -99,15 +94,11 @@ class HklFlyScan(AsyncFlyScanBase):
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 0)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'fixed_time.put', self.exp_time)
|
||||
|
||||
for axis_name in all_motors:
|
||||
if axis_name not in scan_motors:
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
|
||||
else:
|
||||
index = scan_motors.index(axis_name)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller,
|
||||
f'{axis_name}.positions.put',
|
||||
(positions[0, index], positions[-1, index]))
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
|
||||
for index, axis_name in enumerate(scan_motors):
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller,
|
||||
f'{axis_name}.positions.put',
|
||||
positions[:, index])
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
|
||||
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
|
||||
@@ -145,7 +136,7 @@ class HklFlyScan(AsyncFlyScanBase):
|
||||
logger.success(f'{self.scan_name} finished')
|
||||
|
||||
def _publish_readbacks(self, device, readbacks):
|
||||
metadata = {"async_update": "append", "max_shape": [None, None]}
|
||||
metadata = {"async_update": "append", "num_lines": len(readbacks)}
|
||||
msg = messages.DeviceMessage(
|
||||
signals={device: {'value': readbacks} }, metadata=metadata
|
||||
)
|
||||
|
||||
@@ -1,266 +0,0 @@
|
||||
import itertools
|
||||
|
||||
import numpy
|
||||
|
||||
# from bec_lib.endpoints import MessageEndpoints
|
||||
from typing import Literal
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.devicemanager import DeviceManagerBase
|
||||
from bec_lib.logger import bec_logger
|
||||
# from bec_lib import messages
|
||||
# from bec_server.scan_server.errors import ScanAbortion
|
||||
from bec_server.scan_server.scans import ScanAbortion, ScanArgType, ScanBase, AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
class HklMeshScan(ScanBase):
|
||||
scan_name = 'hklmesh_scan'
|
||||
arg_input = {
|
||||
'index': ScanArgType.DEVICE,
|
||||
'start': ScanArgType.FLOAT,
|
||||
'stop': ScanArgType.FLOAT,
|
||||
'points': ScanArgType.INT
|
||||
}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": 2}
|
||||
required_kwargs = ['diffract', 'exp_time']
|
||||
|
||||
def __init__(self, *args, diffract: str, **kwargs):
|
||||
self.diffract = diffract
|
||||
super().__init__(**kwargs)
|
||||
|
||||
if any(m not in ['h', 'k', 'l'] for m in self.caller_args):
|
||||
raise ValueError("Invalid name. Must be 'h', 'k', or 'l'.")
|
||||
if len(self.caller_args) != 2:
|
||||
raise ValueError("Only 2 names can be given.")
|
||||
|
||||
self.scan_report_devices = ['h', 'k', 'l'] + self.scan_motors + self.readout_priority['monitored']
|
||||
|
||||
def update_scan_motors(self):
|
||||
self.scan_motors = self.device_manager.devices[self.diffract].real_axes.get()
|
||||
|
||||
def prepare_positions(self):
|
||||
"""
|
||||
Override base method to yield from _calculate_position method
|
||||
"""
|
||||
yield from self._calculate_positions()
|
||||
self._optimize_trajectory()
|
||||
self.num_pos = len(self.positions) * self.burst_at_each_point
|
||||
yield from self._set_position_offset()
|
||||
self._check_limits()
|
||||
|
||||
def _calculate_positions(self):
|
||||
inner_name, outer_name = [x for x in self.caller_args if x in ['h','k','l']]
|
||||
fixed_name = [x for x in ['h', 'k', 'l'] if x not in self.caller_args][0]
|
||||
fixed_value = yield from self.stubs.send_rpc_and_wait(self.diffract, f'{fixed_name}.position')
|
||||
|
||||
print(f"{inner_name=}, {outer_name=}, {fixed_name=}")
|
||||
|
||||
inner_ind, outer_ind, fixed_ind = (['h', 'k', 'l'].index(x) for x in [inner_name, outer_name, fixed_name])
|
||||
|
||||
hkls = []
|
||||
outer_start, outer_stop, outer_points = self.caller_args[outer_name]
|
||||
inner_start, inner_stop, inner_points = self.caller_args[inner_name]
|
||||
|
||||
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
|
||||
for i, outer in enumerate(outer_vect):
|
||||
if i % 2 == 0:
|
||||
inner_vect = numpy.linspace(inner_start, inner_stop, inner_points, dtype=float)
|
||||
else:
|
||||
inner_vect = numpy.linspace(inner_stop, inner_start, inner_points, dtype=float)
|
||||
|
||||
for inner in inner_vect:
|
||||
hkl = [0, 0, 0]
|
||||
hkl[inner_ind] = inner
|
||||
hkl[outer_ind] = outer
|
||||
hkl[fixed_ind] = fixed_value
|
||||
hkls.append(hkl)
|
||||
|
||||
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
|
||||
# the last two positions 'betaIn' and 'betaOut' are not real motors
|
||||
self.positions = []
|
||||
for position in positions:
|
||||
self.positions.append(position[:-2])
|
||||
|
||||
|
||||
class HklMeshFlyScan(AsyncFlyScanBase):
|
||||
scan_name = 'hklmesh_flyscan'
|
||||
scan_type = 'fly'
|
||||
arg_input = {
|
||||
'index': ScanArgType.DEVICE,
|
||||
'start': ScanArgType.FLOAT,
|
||||
'stop': ScanArgType.FLOAT,
|
||||
'points': ScanArgType.INT
|
||||
}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": 2}
|
||||
required_kwargs = ['diffract', 'controller']
|
||||
use_scan_progress_report = False
|
||||
|
||||
def __init__(self, *args, diffract: str, controller: str, **kwargs):
|
||||
self.diffract = diffract
|
||||
self.controller = controller
|
||||
super().__init__(**kwargs)
|
||||
|
||||
if any(m not in ['h', 'k', 'l'] for m in self.caller_args):
|
||||
raise ValueError("Invalid name. Must be 'h', 'k', or 'l'.")
|
||||
if len(self.caller_args) != 2:
|
||||
raise ValueError("Only 2 names can be given.")
|
||||
|
||||
self.scan_report_devices = ['h', 'k', 'l'] + self.scan_motors + self.readout_priority['monitored']
|
||||
|
||||
@property
|
||||
def monitor_sync(self):
|
||||
return self.diffract
|
||||
|
||||
def update_scan_motors(self):
|
||||
self.scan_motors = self.device_manager.devices[self.diffract].real_axes.get()
|
||||
|
||||
def update_readout_priority(self):
|
||||
self.readout_priority['async'].extend(['h', 'k', 'l'])
|
||||
self.readout_priority['async'].extend(self.scan_motors)
|
||||
|
||||
def prepare_positions(self):
|
||||
"""
|
||||
Override base method to yield from _calculate_position method
|
||||
"""
|
||||
yield from self._calculate_positions()
|
||||
self._optimize_trajectory()
|
||||
self.num_pos = len(self.positions) * self.burst_at_each_point
|
||||
yield from self._set_position_offset()
|
||||
self._check_limits()
|
||||
|
||||
def _calculate_positions(self):
|
||||
inner_name, outer_name = [x for x in self.caller_args if x in ['h','k','l']]
|
||||
fixed_name = [x for x in ['h', 'k', 'l'] if x not in self.caller_args][0]
|
||||
fixed_value = yield from self.stubs.send_rpc_and_wait(self.diffract, f'{fixed_name}.position')
|
||||
|
||||
inner_ind, outer_ind, fixed_ind = (['h', 'k', 'l'].index(x) for x in [inner_name, outer_name, fixed_name])
|
||||
|
||||
hkls = []
|
||||
outer_start, outer_stop, outer_points = self.caller_args[outer_name]
|
||||
inner_start, inner_stop, inner_points = self.caller_args[inner_name]
|
||||
|
||||
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
|
||||
for i, outer in enumerate(outer_vect):
|
||||
if i % 2 == 0:
|
||||
inner_vect = numpy.linspace(inner_start, inner_stop, inner_points, dtype=float)
|
||||
else:
|
||||
inner_vect = numpy.linspace(inner_stop, inner_start, inner_points, dtype=float)
|
||||
|
||||
for inner in inner_vect:
|
||||
hkl = [0, 0, 0]
|
||||
hkl[inner_ind] = inner
|
||||
hkl[outer_ind] = outer
|
||||
hkl[fixed_ind] = fixed_value
|
||||
hkls.append(hkl)
|
||||
|
||||
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
|
||||
# the last two positions 'betaIn' and 'betaOut' are not real motors
|
||||
self.positions = []
|
||||
for position in positions:
|
||||
self.positions.append(position[:-2])
|
||||
|
||||
def scan_core(self):
|
||||
all_motors = self.device_manager.devices[self.controller].axes.get()
|
||||
|
||||
inner_name, outer_name = [x for x in self.caller_args if x in ['h','k','l']]
|
||||
fixed_name = [x for x in ['h', 'k', 'l'] if x not in self.caller_args][0]
|
||||
fixed_value = yield from self.stubs.send_rpc_and_wait(self.diffract, f'{fixed_name}.position')
|
||||
|
||||
inner_ind, outer_ind, fixed_ind = (['h', 'k', 'l'].index(x) for x in [inner_name, outer_name, fixed_name])
|
||||
|
||||
num_pos = 0
|
||||
hkls = []
|
||||
outer_start, outer_stop, outer_points = self.caller_args[outer_name]
|
||||
inner_start, inner_stop, inner_points = self.caller_args[inner_name]
|
||||
|
||||
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
|
||||
for i, outer in enumerate(outer_vect):
|
||||
if i % 2 == 0:
|
||||
start, stop = inner_start, inner_stop
|
||||
else:
|
||||
start, stop = inner_stop, inner_start
|
||||
|
||||
hkls = [[0, 0, 0], [0, 0, 0]]
|
||||
hkls[0][inner_ind] = start
|
||||
hkls[0][outer_ind] = outer
|
||||
hkls[0][fixed_ind] = fixed_value
|
||||
|
||||
hkls[1][inner_ind] = stop
|
||||
hkls[1][outer_ind] = outer
|
||||
hkls[1][fixed_ind] = fixed_value
|
||||
|
||||
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
|
||||
|
||||
# Move motors to start position, the last two positions 'betaIn' and 'betaOut' are not real motors
|
||||
yield from self.stubs.set(device=self.scan_motors, value=positions[0][:-2])
|
||||
|
||||
inner_time = inner_points * self.exp_time
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', 2)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_pulses.put', inner_points)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'start_pulses.put', 1)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'end_pulses.put', 2)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
|
||||
print(f"{inner_time=} {inner_points=} {self.exp_time=}")
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [inner_time, inner_time])
|
||||
|
||||
for axis_name in all_motors:
|
||||
if axis_name not in self.scan_motors:
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
|
||||
else:
|
||||
index = self.scan_motors.index(axis_name)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller,
|
||||
f'{axis_name}.positions.put',
|
||||
(positions[0][index], positions[-1][index]))
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
|
||||
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
|
||||
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
|
||||
if build_status != 1:
|
||||
raise ScanAbortion('Profile build failed')
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_profile')
|
||||
execute_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_status.get')
|
||||
if execute_status != 1:
|
||||
raise ScanAbortion('Profile execute failed')
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'readback_profile')
|
||||
readback_status = yield from self.stubs.send_rpc_and_wait(self.controller,'readback_status.get')
|
||||
if readback_status != 1:
|
||||
raise ScanAbortion('Profile readback failed')
|
||||
|
||||
angle_readbacks = []
|
||||
for index, axis_name in enumerate(self.scan_motors):
|
||||
readbacks = yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.readbacks.get')
|
||||
self._publish_readbacks(axis_name, readbacks)
|
||||
angle_readbacks.append(readbacks)
|
||||
# motor readbacks are aranged column-wise
|
||||
angle_readbacks = [list(x) for x in zip(*angle_readbacks)]
|
||||
|
||||
hkls = yield from self.stubs.send_rpc_and_wait(self.diffract, 'hkls_from_angles', angle_readbacks)
|
||||
hkls = numpy.array(hkls)
|
||||
|
||||
for index, name in enumerate(['h', 'k', 'l', 'betaIn', 'betaOut']):
|
||||
self._publish_readbacks(name, hkls[:, index])
|
||||
|
||||
# motor readbacks have more points than generated pulses
|
||||
num_pos += len(angle_readbacks)
|
||||
|
||||
self.num_pos = num_pos
|
||||
logger.success(f'{self.scan_name} finished')
|
||||
|
||||
def _publish_readbacks(self, device, readbacks):
|
||||
metadata = {"async_update": "append", "max_shape": [None, None]}
|
||||
msg = messages.DeviceMessage(
|
||||
signals={device: {'value': readbacks} }, metadata=metadata
|
||||
)
|
||||
|
||||
self.stubs.connector.xadd(
|
||||
topic=MessageEndpoints.device_async_readback(
|
||||
scan_id=self.scan_id, device=device
|
||||
),
|
||||
msg_dict={"data": msg},
|
||||
expire=1800,
|
||||
)
|
||||
0
addams_bec/scans/metadata_schema/__init__.py
Normal file
0
addams_bec/scans/metadata_schema/__init__.py
Normal file
12
addams_bec/scans/metadata_schema/metadata_schema_registry.py
Normal file
12
addams_bec/scans/metadata_schema/metadata_schema_registry.py
Normal file
@@ -0,0 +1,12 @@
|
||||
# from .metadata_schema_template import ExampleSchema
|
||||
|
||||
METADATA_SCHEMA_REGISTRY = {
|
||||
# Add models which should be used to validate scan metadata here.
|
||||
# Make a model according to the template, and import it as above
|
||||
# Then associate it with a scan like so:
|
||||
# "example_scan": ExampleSchema
|
||||
}
|
||||
|
||||
# Define a default schema type which should be used as the fallback for everything:
|
||||
|
||||
DEFAULT_SCHEMA = None
|
||||
34
addams_bec/scans/metadata_schema/metadata_schema_template.py
Normal file
34
addams_bec/scans/metadata_schema/metadata_schema_template.py
Normal file
@@ -0,0 +1,34 @@
|
||||
# # By inheriting from BasicScanMetadata you can define a schema by which metadata
|
||||
# # supplied to a scan must be validated.
|
||||
# # This schema is a Pydantic model: https://docs.pydantic.dev/latest/concepts/models/
|
||||
# # but by default it will still allow you to add any arbitrary information to it.
|
||||
# # That is to say, when you run a scan with which such a model has been associated in the
|
||||
# # metadata_schema_registry, you can supply any python dictionary with strings as keys
|
||||
# # and built-in python types (strings, integers, floats) as values, and these will be
|
||||
# # added to the experiment metadata, but it *must* contain the keys and values of the
|
||||
# # types defined in the schema class.
|
||||
# #
|
||||
# #
|
||||
# # For example, say that you would like to enforce recording information about sample
|
||||
# # pretreatment, you could define the following:
|
||||
# #
|
||||
#
|
||||
# from bec_lib.metadata_schema import BasicScanMetadata
|
||||
#
|
||||
#
|
||||
# class ExampleSchema(BasicScanMetadata):
|
||||
# treatment_description: str
|
||||
# treatment_temperature_k: int
|
||||
#
|
||||
#
|
||||
# # If this was used according to the example in metadata_schema_registry.py,
|
||||
# # then when calling the scan, the user would need to write something like:
|
||||
# >>> scans.example_scan(
|
||||
# >>> motor,
|
||||
# >>> 1,
|
||||
# >>> 2,
|
||||
# >>> 3,
|
||||
# >>> metadata={"treatment_description": "oven overnight", "treatment_temperature_k": 575},
|
||||
# >>> )
|
||||
#
|
||||
# # And the additional metadata would be saved in the HDF5 file created for the scan.
|
||||
@@ -1,233 +0,0 @@
|
||||
import numpy
|
||||
|
||||
from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.devicemanager import DeviceManagerBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import ScanAbortion, ScanArgType, ScanBase, AsyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
class LineProfileMove(AsyncFlyScanBase):
|
||||
scan_name = 'line_profilemove'
|
||||
scan_type = 'fly'
|
||||
arg_input = {
|
||||
'motor': ScanArgType.DEVICE,
|
||||
'start': ScanArgType.FLOAT,
|
||||
'stop': ScanArgType.FLOAT,
|
||||
}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None}
|
||||
required_kwargs = ['controller', 'points']
|
||||
use_scan_progress_report = False
|
||||
|
||||
def __init__(self, *args, controller: str, points: int, **kwargs):
|
||||
self.controller = controller
|
||||
self.points = self.num_pos = points
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.all_motors = self.device_manager.devices[self.controller].axes.get()
|
||||
if not all(m in self.all_motors for m in self.scan_motors):
|
||||
raise ValueError(f"Scan motors {self.scan_motors} not in {self.all_motors}")
|
||||
|
||||
def update_readout_priority(self):
|
||||
self.readout_priority['async'].extend(self.scan_motors)
|
||||
|
||||
def update_scan_motors(self):
|
||||
ScanBase.update_scan_motors(self)
|
||||
|
||||
def prepare_positions(self):
|
||||
"""prepare the positions for the scan"""
|
||||
self._calculate_positions()
|
||||
self._optimize_trajectory()
|
||||
self.num_pos = len(self.positions) * self.burst_at_each_point
|
||||
yield from self._set_position_offset()
|
||||
self._check_limits()
|
||||
|
||||
def _calculate_positions(self):
|
||||
axis = []
|
||||
for _, val in self.caller_args.items():
|
||||
ax_pos = numpy.linspace(val[0], val[1], self.points, dtype=float)
|
||||
axis.append(ax_pos)
|
||||
self.positions = numpy.array(list(zip(*axis)), dtype=float)
|
||||
|
||||
def scan_core(self):
|
||||
|
||||
total_time = self.exp_time * self.points
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', 2)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_pulses.put', self.points)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'start_pulses.put', 1)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'end_pulses.put', 2)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [total_time, total_time])
|
||||
|
||||
for axis_name in self.all_motors:
|
||||
if axis_name not in self.scan_motors:
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
|
||||
else:
|
||||
index = self.scan_motors.index(axis_name)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller,
|
||||
f'{axis_name}.positions.put',
|
||||
(self.caller_args[axis_name][0], self.caller_args[axis_name][1]))
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
|
||||
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
|
||||
if build_status != 1:
|
||||
raise ScanAbortion('Profile build failed')
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_profile')
|
||||
execute_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_status.get')
|
||||
if execute_status != 1:
|
||||
raise ScanAbortion('Profile execute failed')
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'readback_profile')
|
||||
readback_status = yield from self.stubs.send_rpc_and_wait(self.controller,'readback_status.get')
|
||||
if readback_status != 1:
|
||||
raise ScanAbortion('Profile readback failed')
|
||||
|
||||
for index, axis_name in enumerate(self.scan_motors):
|
||||
readbacks = yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.readbacks.get')
|
||||
self._publish_readbacks(axis_name, readbacks)
|
||||
|
||||
logger.success(f'{self.scan_name} finished')
|
||||
|
||||
def _publish_readbacks(self, device, readbacks):
|
||||
metadata = {"async_update": "append", "max_shape": [None, None]}
|
||||
msg = messages.DeviceMessage(
|
||||
signals={device: {'value': readbacks} }, metadata=metadata
|
||||
)
|
||||
|
||||
self.stubs.connector.xadd(
|
||||
topic=MessageEndpoints.device_async_readback(
|
||||
scan_id=self.scan_id, device=device
|
||||
),
|
||||
msg_dict={"data": msg},
|
||||
expire=1800,
|
||||
)
|
||||
|
||||
|
||||
class GridProfileMove(AsyncFlyScanBase):
|
||||
scan_name = 'grid_profilemove'
|
||||
scan_type = 'fly'
|
||||
arg_input = {
|
||||
'motor': ScanArgType.DEVICE,
|
||||
'start': ScanArgType.FLOAT,
|
||||
'stop': ScanArgType.FLOAT,
|
||||
'points': ScanArgType.INT,
|
||||
}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": 2}
|
||||
required_kwargs = ['controller']
|
||||
use_scan_progress_report = False
|
||||
|
||||
def __init__(self, *args, controller: str, **kwargs):
|
||||
self.controller = controller
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.all_motors = self.device_manager.devices[self.controller].axes.get()
|
||||
if not all(m in self.all_motors for m in self.scan_motors):
|
||||
raise ValueError(f"Scan motors {self.scan_motors} not in {self.all_motors}")
|
||||
|
||||
def update_readout_priority(self):
|
||||
self.readout_priority['async'].extend(self.scan_motors)
|
||||
|
||||
def update_scan_motors(self):
|
||||
ScanBase.update_scan_motors(self)
|
||||
|
||||
def prepare_positions(self):
|
||||
"""prepare the positions for the scan"""
|
||||
self._calculate_positions()
|
||||
self._optimize_trajectory()
|
||||
self.num_pos = len(self.positions) * self.burst_at_each_point
|
||||
yield from self._set_position_offset()
|
||||
self._check_limits()
|
||||
|
||||
def _calculate_positions(self):
|
||||
inner_motor, outer_motor = self.scan_motors
|
||||
outer_start, outer_stop, outer_points = self.caller_args[outer_motor]
|
||||
inner_start, inner_stop, inner_points = self.caller_args[inner_motor]
|
||||
|
||||
self.positions = []
|
||||
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
|
||||
for i, outer in enumerate(outer_vect):
|
||||
if i % 2 == 0:
|
||||
inner_vect = numpy.linspace(inner_start, inner_stop, inner_points, dtype=float)
|
||||
else:
|
||||
inner_vect = numpy.linspace(inner_stop, inner_start, inner_points, dtype=float)
|
||||
|
||||
for inner in inner_vect:
|
||||
self.positions.append((inner, outer))
|
||||
|
||||
def scan_core(self):
|
||||
inner_motor, outer_motor = self.scan_motors
|
||||
outer_start, outer_stop, outer_points = self.caller_args[outer_motor]
|
||||
inner_start, inner_stop, inner_points = self.caller_args[inner_motor]
|
||||
|
||||
num_pos = 0
|
||||
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
|
||||
for i, outer in enumerate(outer_vect):
|
||||
if i % 2 == 0:
|
||||
start, stop = inner_start, inner_stop
|
||||
else:
|
||||
start, stop = inner_stop, inner_start
|
||||
|
||||
# Move outer motor to start position
|
||||
yield from self.stubs.set(device=outer_motor, value=outer)
|
||||
outer_readback = yield from self.stubs.send_rpc_and_wait(outer_motor, "position")
|
||||
|
||||
inner_time = self.exp_time * inner_points
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', 2)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_pulses.put', inner_points)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'start_pulses.put', 1)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'end_pulses.put', 2)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [inner_time, inner_time])
|
||||
|
||||
for axis_name in self.all_motors:
|
||||
if axis_name == inner_motor:
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller,
|
||||
f'{axis_name}.positions.put',
|
||||
(start, stop))
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
|
||||
else:
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
|
||||
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
|
||||
if build_status != 1:
|
||||
raise ScanAbortion('Profile build failed')
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_profile')
|
||||
execute_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_status.get')
|
||||
if execute_status != 1:
|
||||
raise ScanAbortion('Profile execute failed')
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.controller, 'readback_profile')
|
||||
readback_status = yield from self.stubs.send_rpc_and_wait(self.controller,'readback_status.get')
|
||||
if readback_status != 1:
|
||||
raise ScanAbortion('Profile readback failed')
|
||||
|
||||
readbacks = yield from self.stubs.send_rpc_and_wait(self.controller, f'{inner_motor}.readbacks.get')
|
||||
self._publish_readbacks(inner_motor, readbacks)
|
||||
|
||||
readbacks = [outer_readback] * len(readbacks)
|
||||
self._publish_readbacks(outer_motor, readbacks)
|
||||
|
||||
# motor readbacks have more points than generated pulses
|
||||
num_pos += len(readbacks)
|
||||
|
||||
self.num_pos = num_pos
|
||||
logger.success(f'{self.scan_name} finished')
|
||||
|
||||
def _publish_readbacks(self, device, readbacks):
|
||||
metadata = {"async_update": "append", "max_shape": [None]}
|
||||
msg = messages.DeviceMessage(
|
||||
signals={device: {'value': readbacks} }, metadata=metadata
|
||||
)
|
||||
|
||||
self.stubs.connector.xadd(
|
||||
topic=MessageEndpoints.device_async_readback(
|
||||
scan_id=self.scan_id, device=device
|
||||
),
|
||||
msg_dict={"data": msg},
|
||||
expire=1800,
|
||||
)
|
||||
1
bin/.gitignore
vendored
Normal file
1
bin/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
# Add anything you don't want to check in to git, e.g. very large files
|
||||
@@ -5,8 +5,8 @@ build-backend = "hatchling.build"
|
||||
[project]
|
||||
name = "addams_bec"
|
||||
version = "0.0.0"
|
||||
description = "Custom device implementations based on the ophyd hardware abstraction layer"
|
||||
requires-python = ">=3.10"
|
||||
description = "A plugin repository for BEC"
|
||||
requires-python = ">=3.11"
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Programming Language :: Python :: 3",
|
||||
@@ -17,6 +17,7 @@ dependencies = []
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"black",
|
||||
"copier",
|
||||
"isort",
|
||||
"coverage",
|
||||
"pylint",
|
||||
@@ -38,12 +39,15 @@ plugin_file_writer = "addams_bec.file_writer"
|
||||
[project.entry-points."bec.scans"]
|
||||
plugin_scans = "addams_bec.scans"
|
||||
|
||||
[project.entry-points."bec.scans.metadata_schema"]
|
||||
plugin_metadata_schema = "addams_bec.scans.metadata_schema"
|
||||
|
||||
[project.entry-points."bec.ipython_client_startup"]
|
||||
plugin_ipython_client_pre = "addams_bec.bec_ipython_client.startup.pre_startup"
|
||||
plugin_ipython_client_post = "addams_bec.bec_ipython_client.startup"
|
||||
|
||||
[project.entry-points."bec.widgets.auto_updates"]
|
||||
plugin_widgets_update = "addams_bec.bec_widgets.auto_updates:PlotUpdate"
|
||||
plugin_widgets_update = "addams_bec.bec_widgets.auto_updates"
|
||||
|
||||
[project.entry-points."bec.widgets.user_widgets"]
|
||||
plugin_widgets = "addams_bec.bec_widgets.widgets"
|
||||
|
||||
@@ -1,31 +1,34 @@
|
||||
# Getting Started with Testing using pytest
|
||||
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
|
||||
It can be install via
|
||||
``` bash
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
|
||||
It can be installed via
|
||||
|
||||
```bash
|
||||
pip install pytest
|
||||
```
|
||||
in your *python environment*.
|
||||
|
||||
in your _python environment_.
|
||||
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
|
||||
|
||||
## Introduction
|
||||
|
||||
Tests in this package should be stored in the `tests` directory.
|
||||
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
|
||||
It is mandatory for test files to begin with `test_` for pytest to discover them.
|
||||
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
|
||||
``` bash
|
||||
```bash
|
||||
pytest -v --random-order ./tests
|
||||
```
|
||||
|
||||
Note, the python environment needs to be active.
|
||||
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
|
||||
## Test examples
|
||||
|
||||
Writing tests can be quite specific for the given function.
|
||||
Writing tests can be quite specific for the given function.
|
||||
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
|
||||
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
|
||||
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).
|
||||
|
||||
|
||||
38
tests/tests_bec_ipython_client/test_energy_optimizer.py
Normal file
38
tests/tests_bec_ipython_client/test_energy_optimizer.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from addams_bec.bec_ipython_client.plugins.energy_optimizer.addams_energy_optimizer import (
|
||||
EnergyOptimizer,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def optimizer():
|
||||
dm = mock.MagicMock()
|
||||
yield EnergyOptimizer(dm)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"start_energy, target_energy, expected",
|
||||
[
|
||||
(5, 10, [(10, "Si")]),
|
||||
(5, 20, [(9, "Rh"), (20, "Rh")]),
|
||||
(5, 40, [(9, "Rh"), (22, "Pt"), (40, "Pt")]),
|
||||
(5, 5, [(5, "Si")]),
|
||||
(5, 4, ValueError),
|
||||
(5, 41, ValueError),
|
||||
(2, 8, ValueError),
|
||||
(18, 40, [(22, "Pt"), (40, "Pt")]),
|
||||
(18, 5, [(9, "Si"), (5, "Si")]),
|
||||
(18, 10, [(10, "Rh")]),
|
||||
(18, 20, [(20, "Rh")]),
|
||||
(25, 7, [(22, "Rh"), (9, "Si"), (7, "Si")]),
|
||||
],
|
||||
)
|
||||
def test_get_transition_steps(optimizer, start_energy, target_energy, expected):
|
||||
if expected == ValueError:
|
||||
with pytest.raises(ValueError):
|
||||
optimizer.get_transition_steps(start_energy, target_energy)
|
||||
else:
|
||||
assert optimizer.get_transition_steps(start_energy, target_energy) == expected
|
||||
@@ -1,31 +1,34 @@
|
||||
# Getting Started with Testing using pytest
|
||||
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
|
||||
It can be install via
|
||||
``` bash
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
|
||||
It can be installed via
|
||||
|
||||
```bash
|
||||
pip install pytest
|
||||
```
|
||||
in your *python environment*.
|
||||
|
||||
in your _python environment_.
|
||||
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
|
||||
|
||||
## Introduction
|
||||
|
||||
Tests in this package should be stored in the `tests` directory.
|
||||
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
|
||||
It is mandatory for test files to begin with `test_` for pytest to discover them.
|
||||
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
|
||||
``` bash
|
||||
```bash
|
||||
pytest -v --random-order ./tests
|
||||
```
|
||||
|
||||
Note, the python environment needs to be active.
|
||||
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
|
||||
## Test examples
|
||||
|
||||
Writing tests can be quite specific for the given function.
|
||||
Writing tests can be quite specific for the given function.
|
||||
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
|
||||
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
|
||||
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).
|
||||
|
||||
|
||||
@@ -1,31 +1,34 @@
|
||||
# Getting Started with Testing using pytest
|
||||
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
|
||||
It can be install via
|
||||
``` bash
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
|
||||
It can be installed via
|
||||
|
||||
```bash
|
||||
pip install pytest
|
||||
```
|
||||
in your *python environment*.
|
||||
|
||||
in your _python environment_.
|
||||
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
|
||||
|
||||
## Introduction
|
||||
|
||||
Tests in this package should be stored in the `tests` directory.
|
||||
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
|
||||
It is mandatory for test files to begin with `test_` for pytest to discover them.
|
||||
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
|
||||
``` bash
|
||||
```bash
|
||||
pytest -v --random-order ./tests
|
||||
```
|
||||
|
||||
Note, the python environment needs to be active.
|
||||
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
|
||||
## Test examples
|
||||
|
||||
Writing tests can be quite specific for the given function.
|
||||
Writing tests can be quite specific for the given function.
|
||||
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
|
||||
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
|
||||
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).
|
||||
|
||||
|
||||
@@ -1,31 +1,34 @@
|
||||
# Getting Started with Testing using pytest
|
||||
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
|
||||
It can be install via
|
||||
``` bash
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
|
||||
It can be installed via
|
||||
|
||||
```bash
|
||||
pip install pytest
|
||||
```
|
||||
in your *python environment*.
|
||||
|
||||
in your _python environment_.
|
||||
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
|
||||
|
||||
## Introduction
|
||||
|
||||
Tests in this package should be stored in the `tests` directory.
|
||||
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
|
||||
It is mandatory for test files to begin with `test_` for pytest to discover them.
|
||||
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
|
||||
``` bash
|
||||
```bash
|
||||
pytest -v --random-order ./tests
|
||||
```
|
||||
|
||||
Note, the python environment needs to be active.
|
||||
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
|
||||
## Test examples
|
||||
|
||||
Writing tests can be quite specific for the given function.
|
||||
Writing tests can be quite specific for the given function.
|
||||
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
|
||||
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
|
||||
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).
|
||||
|
||||
|
||||
@@ -1,31 +1,34 @@
|
||||
# Getting Started with Testing using pytest
|
||||
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
|
||||
It can be install via
|
||||
``` bash
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
|
||||
It can be installed via
|
||||
|
||||
```bash
|
||||
pip install pytest
|
||||
```
|
||||
in your *python environment*.
|
||||
|
||||
in your _python environment_.
|
||||
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
|
||||
|
||||
## Introduction
|
||||
|
||||
Tests in this package should be stored in the `tests` directory.
|
||||
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
|
||||
It is mandatory for test files to begin with `test_` for pytest to discover them.
|
||||
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
|
||||
``` bash
|
||||
```bash
|
||||
pytest -v --random-order ./tests
|
||||
```
|
||||
|
||||
Note, the python environment needs to be active.
|
||||
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
|
||||
## Test examples
|
||||
|
||||
Writing tests can be quite specific for the given function.
|
||||
Writing tests can be quite specific for the given function.
|
||||
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
|
||||
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
|
||||
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).
|
||||
|
||||
|
||||
@@ -1,31 +1,34 @@
|
||||
# Getting Started with Testing using pytest
|
||||
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
|
||||
It can be install via
|
||||
``` bash
|
||||
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
|
||||
It can be installed via
|
||||
|
||||
```bash
|
||||
pip install pytest
|
||||
```
|
||||
in your *python environment*.
|
||||
|
||||
in your _python environment_.
|
||||
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
|
||||
|
||||
## Introduction
|
||||
|
||||
Tests in this package should be stored in the `tests` directory.
|
||||
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
|
||||
It is mandatory for test files to begin with `test_` for pytest to discover them.
|
||||
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
To run all tests, navigate to the directory of the plugin from the command line, and run the command
|
||||
|
||||
``` bash
|
||||
```bash
|
||||
pytest -v --random-order ./tests
|
||||
```
|
||||
|
||||
Note, the python environment needs to be active.
|
||||
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
|
||||
|
||||
## Test examples
|
||||
|
||||
Writing tests can be quite specific for the given function.
|
||||
Writing tests can be quite specific for the given function.
|
||||
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
|
||||
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
|
||||
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).
|
||||
|
||||
|
||||
Reference in New Issue
Block a user