initial commit

This commit is contained in:
Xiaoqiang Wang
2024-06-20 19:10:58 +02:00
commit 7bb76f2f93
36 changed files with 2037 additions and 0 deletions

3
.git_hooks/post-commit Normal file
View File

@@ -0,0 +1,3 @@
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
semantic-release changelog -D version_variable=$SCRIPT_DIR/../../semantic_release/__init__.py:__version__
semantic-release version -D version_variable=$SCRIPT_DIR/../../semantic_release/__init__.py:__version__

3
.git_hooks/pre-commit Normal file
View File

@@ -0,0 +1,3 @@
black --line-length=100 $(git diff --cached --name-only --diff-filter=ACM -- '*.py')
isort --line-length=100 --profile=black --multi-line=3 --trailing-comma $(git diff --cached --name-only --diff-filter=ACM -- '*.py')
git add $(git diff --cached --name-only --diff-filter=ACM -- '*.py')

180
.gitignore vendored Normal file
View File

@@ -0,0 +1,180 @@
**/*_venv
**/.idea
*.log
**/__pycache__
**/.DS_Store
**/out
**/.vscode
**/.pytest_cache
**/*.egg*
# recovery_config files
recovery_config_*
# file writer data
**.h5
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/**/_build/
docs/**/autodoc/
docs/**/_autosummary/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
**.prof
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

4
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,4 @@
include:
- file: /templates/plugin-repo-template.yml
inputs: {name: addams_bec, target: addams_bec}
project: bec/awi_utils

0
addams_bec/__init__.py Normal file
View File

View File

@@ -0,0 +1,318 @@
import builtins
import collections
import functools
import math
import numpy
from bec_ipython_client.prettytable import PrettyTable
__all__ = [
'setlat',
'setlambda',
'setmode',
'freeze',
'unfreeze',
'br',
'ubr',
'mvhkl',
'umvhkl',
'ca',
'wh',
'pa',
'orientAdd',
'orientRemove',
'orientShow',
'orientFit'
]
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
# check for diffractometer device
diffract = None
if dev is not None:
if 'x04h' in dev:
diffract = dev.x04h
elif 'x04v' in dev:
diffract = dev.x04v
if diffract is not None:
RealPosition = collections.namedtuple('RealPosition', ' '.join(diffract.get_real_positioners()))
def freeze(
angle: float | None
):
"""
Freeze the value of the mode dependent angle, so when calculating motor positions
corresponding to an arbitrary (H, K, L ), the angle will be reset to the frozen value
before the calculation no matter what the current position of the diffractometer.
"""
diffract.freeze(angle)
def unfreeze():
"""
Subsequent angle calculations will use whatever the current value of the associated
constrained angle for the cur- rent mode.
"""
diffract.unfreeze()
def setlat(
a: float, b: float, c: float, alpha: float, beta: float, gamma: float
):
"""
Set sample lattice parameters
"""
diffract.set_lattice((a, b, c, alpha, beta, gamma))
def setlambda(
wavelength: float
):
"""
Set the x-ray wavelength (in Angstroms)
"""
if wavelength <= 0:
print('Invalid input: wavelength <=0!')
return
current_wavelength = diffract.get_wavelength()
if math.isclose(wavelength, current_wavelength):
print(f'Still using {current_wavelength} A')
else:
diffract.set_wavelength(wavelength)
print(f'Lambda reset from {current_wavelength} to {wavelength} A')
def setmode(
mode: int
):
"""
Set the geometry mode
"""
if mode < 0 or mode > 2:
print('Valid mode is from 0 to 2')
return
current_mode = diffract.get_mode()
if mode == current_mode:
print(f'Still using mode {current_mode}')
else:
diffract.set_mode(mode)
print(f'Mode reset from {current_mode} to {mode}')
def mvhkl(
h: float, k: float, l: float, auto=False
):
"""
Move to the reciprocol space coordinates
"""
try:
angles = diffract.forward(h, k, l)[:-2]
except Exception as exc:
print(f'{h} {k} {l} is not obtainable: {exc}')
return
if not auto:
for axis, current, target in zip(RealPosition._fields, _currentPosition(), angles):
print('%7s = %9.4f --> %9.4f' % (axis, current, target))
answer = input('Move to these values? [Y/n]: ')
if answer.startswith(('N', 'n')):
print('Move abandoned.')
return
br(h, k, l)
def br(
h: float, k: float, l: float
):
"""
Move to the reciprocol space coordinates
"""
args = []
angles = diffract.forward(h, k, l)[:-2]
for axis, value in zip(RealPosition._fields, angles):
motor = getattr(dev, axis)
args.extend((motor, value))
scans.mv(*args, relative=False)
def ubr(
h: float, k: float, l: float
):
"""
Move to the reciprocol space coordinates with updates
"""
args = []
angles = diffract.forward(h, k, l)[:-2]
for axis, value in zip(RealPosition._fields, angles):
motor = getattr(dev, axis)
args.extend((motor, value))
scans.umv(*args, relative=False)
def umvhkl(
h: float, k: float, l: float, auto=False
):
"""
Move to the reciprocol space coordinates with updates
"""
try:
angles = diffract.forward(h, k, l)[:-2]
except Exception as exc:
print(f'{h} {k} {l} is not obtainable: {exc}')
return
if not auto:
for axis, current, target in zip(RealPosition._fields, _currentPosition(), angles):
print('%7s = %9.4f --> %9.4f' % (axis, current, target))
answer = input('Move to these values? [Y/n]: ')
if answer.startswith(('N', 'n')):
print('Move abandoned.')
return
ubr(h, k, l)
def ca(
h: float, k: float, l: float
):
"""
Calculate angle positions for a given point in reciprocol space
"""
angles = diffract.forward(h, k, l)
print("\nCalculated positions:\n")
print(f'H K L = {h} {k} {l}')
print('BetaIn = %.5f BetaOut = %.5f' %(angles[-2], angles[-1]))
print('Lambda = %.3f' % diffract.get_wavelength())
print()
_showAngles(angles[:-2])
def wh():
"""
Show where principal axes and reciprocal space
"""
h = diffract.h.position
k = diffract.k.position
l = diffract.l.position
betaIn = diffract.betaIn.position
betaOut = diffract.betaOut.position
print(f'H K L = {h:.4f} {k:.4f} {l:.4f}')
print('BetaIn = %.5f BetaOut = %.5f' %(betaIn, betaOut))
print('Lambda = %.3f' % diffract.get_wavelength())
print()
_showAngles()
def pa():
"""
Show geometry parameters
"""
if diffract.name == 'x04v':
print('x04v (Newport Microcontrols 2+3 at SLS) vertical geometry')
elif diffract.name == 'x04h':
print('x04h (Newport Microcontrols 2+3 at SLS) horizontal geometry')
match mode := diffract.get_mode():
case 0:
print(f' BetaIn Fixed (mode {mode})')
case 1:
print(f' BetaOut Fixed (mode {mode})')
case 2:
print(f' BetaIn equals BetaOut (mode {mode})')
if beta_frozen := diffract.get_frozen():
print(f' Frozen coordinate: {beta_frozen}')
def orientShow():
"""
Display list of measured reflections
"""
print('\n(Using lattice constants:)')
lattice = diffract.get_lattice()
print('a = %.4g, b = %.4g, b = %.4g, alpha = %.6g, beta = %.6g, gamma = %.6g' %
(lattice[0], lattice[1], lattice[2], lattice[3], lattice[4], lattice[5]))
print("\n------------------------------------------------\n")
reflections = diffract.get_reflections()
if reflections:
print('The defined reflections are:')
header = ['h', 'k', 'l'] + diffract.real_position['fields'][:-1]
table = PrettyTable(header, padding=12)
print(' ', table.get_header())
for reflection in reflections:
h, k, l, angles = reflection
text = [f'{h:9.4f}', f'{k:9.4f}', f'{l:9.4f}'] + [f'{x:9.4f}' for x in angles]
print(' ', table.get_row(*text))
print("\n------------------------------------------------\n")
_showUB()
def orientRemove(
h: float, k: float, l: float
):
"""
Remove a measured reflection from the list
"""
diffract.remove_reflection(h, k, l)
def orientAdd(
h: float, k: float, l: float, *args
):
"""
Add a reflection to the list of measured reflections
"""
angles = args
if not angles:
response = diffract.real_position
# The original return value is of namedtuple type,
# which gets serialized to a dictionary by the device server.
angles = tuple(response['values'][axis] for axis in response['fields'] if axis != 'nu')
if len(angles) < 4:
print('Please specify all angles')
return
diffract.add_reflection(h, k, l, angles)
def orientFit():
"""
Fit UB matrix from given reflections
"""
reflections = diffract.get_reflections()
if len(reflections) < 2:
print("There are not enough reflections defined.")
return
diffract.compute_UB()
_showUB()
def _showUB():
UB = diffract.get_UB()
print('Orientation matrix by row:')
print(' Row 1: %8.5f %8.5f %8.5f' % (UB[0,0], UB[0,1], UB[0,1]))
print(' Row 2: %8.5f %8.5f %8.5f' % (UB[1,0], UB[1,1], UB[1,1]))
print(' Row 3: %8.5f %8.5f %8.5f' % (UB[2,0], UB[2,1], UB[2,1]))
def _showAngles(angles=None):
if angles is None:
angles = _currentPosition()
table = PrettyTable(RealPosition._fields, padding=12)
print(table.get_header())
text = tuple(f'{x:9.4f}' for x in angles)
print(table.get_row(*text))
def _currentPosition():
response = diffract.real_position
# The original return value is of namedtuple type,
# which gets serialized to a dictionary by the device server.
angles = RealPosition(*(response['values'][axis] for axis in response['fields']))
return angles

View File

@@ -0,0 +1,38 @@
"""
Post startup script for the BEC client. This script is executed after the
IPython shell is started. It is used to load the beamline specific
information and to setup the prompts.
The script is executed in the global namespace of the IPython shell. This
means that all variables defined here are available in the shell.
While command-line arguments have to be set in the pre-startup script, the
post-startup script can be used to load beamline specific information and
to setup the prompts.
from bec_lib.logger import bec_logger
logger = bec_logger.logger
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
elif _args.session.lower() == "csaxs":
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
logger.success("cSAXS session loaded.")
"""
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
bec.load_high_level_interface("hkl_hli")

View File

@@ -0,0 +1,23 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
"""
from bec_lib.service_config import ServiceConfig
def extend_command_line_args(parser):
"""
Extend the command line arguments of the BEC client.
"""
# parser.add_argument("--session", help="Session name", type=str, default="cSAXS")
return parser
# def get_config() -> ServiceConfig:
# """
# Create and return the service configuration.
# """
# return ServiceConfig(redis={"host": "localhost", "port": 6379})

View File

View File

View File

@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
# os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
# os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

View File

@@ -0,0 +1,91 @@
phi:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X04SA-ES3-XPS:PHI'
enabled: true
readOnly: false
oh:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X04SA-ES3-XPS:OMEGAH'
enabled: true
readOnly: false
delta:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X04SA-ES3-XPS:DELTA'
enabled: true
readOnly: false
gam:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X04SA-ES3-XPS:GAMMA'
enabled: true
readOnly: false
nu:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X04SA-ES3-XPS:NU'
enabled: true
readOnly: false
x04h:
readoutPriority: baseline
deviceClass: addams_bec.devices.diffract.X04H
deviceConfig:
prefix: ''
deviceTags:
- hkl
enabled: true
readOnly: false
h:
readoutPriority: monitored
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def compute_signals(signal):\n return signal.get()\n"
input_signals:
- "x04h_h_readback"
metadata:
precision: 4
deviceTags:
- hkl
enabled: true
readOnly: true
k:
readoutPriority: monitored
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def compute_signals(signal):\n return signal.get()\n"
input_signals:
- "x04h_k_readback"
metadata:
precision: 4
deviceTags:
- hkl
enabled: true
readOnly: true
l:
readoutPriority: monitored
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def compute_signals(signal):\n return signal.get()\n"
input_signals:
- "x04h_l_readback"
metadata:
precision: 4
deviceTags:
- hkl
enabled: true
readOnly: true

View File

@@ -0,0 +1,100 @@
eiger:
readoutPriority: baseline
deviceClass: addams_bec.devices.detector.Eiger500K
deviceConfig:
prefix: "X04SA-ES3-EIGER:"
enabled: true
readOnly: false
softwareTrigger: true
ov:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: "X04SA-ES3-XPS:OMEGAV"
enabled: true
readOnly: false
alp:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: "X04SA-ES3-XPS:ALPHA"
enabled: true
readOnly: false
delta:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: "X04SA-ES3-XPS:DELTA"
enabled: true
readOnly: false
gam:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: "X04SA-ES3-XPS:GAMMA"
enabled: true
readOnly: false
nu:
readoutPriority: baseline
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: "X04SA-ES3-XPS:NU"
enabled: true
readOnly: false
x04v:
readoutPriority: baseline
deviceClass: addams_bec.devices.diffract.X04V
deviceConfig:
prefix: ""
deviceTags:
- hkl
enabled: true
readOnly: false
h:
readoutPriority: monitored
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def compute_signals(signal):\n return signal.get()\n"
input_signals:
- "x04v_h_readback"
metadata:
precision: 4
deviceTags:
- hkl
enabled: true
readOnly: true
k:
readoutPriority: monitored
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def compute_signals(signal):\n return signal.get()\n"
input_signals:
- "x04v_k_readback"
metadata:
precision: 4
deviceTags:
- hkl
enabled: true
readOnly: true
l:
readoutPriority: monitored
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def compute_signals(signal):\n return signal.get()\n"
input_signals:
- "x04v_l_readback"
metadata:
precision: 4
deviceTags:
- hkl
enabled: true
readOnly: true

View File

611
addams_bec/devices/calc.py Normal file
View File

@@ -0,0 +1,611 @@
"""
Angular calculation for X04SA surface diffractometer.
The calculation is detailed in J. Appl. Cryst. 44, 73-83 (2011).
The SPEC implemention in C is at https://git.psi.ch/spec/spec/-/tree/master/sls.
The SPEC functions, i.e. atoQ, Qtoa and ang_to_hphi, are converted to Python.
The C code structrure and comments are kept whenever possible.
"""
import collections
import enum
from math import pi, sin, cos, tan, asin, acos, atan, atan2, fabs, sqrt
from typing import Tuple
import numpy
RAD = pi / 180 # convert degree to radian
RAD_1 = 180 / pi # convert radian to degree
VERY_SMALL = 1e-15
TWOPI = 2 * pi
class Mode(enum.IntEnum):
"""
Incidience and emergence angle modes
"""
BETAIN_FIX = 0 #: given betaIn, find others
BETAOUT_FIX = 1 #: given betaOut, find others
BETAIN_EQ_BETAOUT = 2 #: betaIn equals betaOut
class NuMode(enum.IntEnum):
"""
Nu motor modes
"""
NO = 0 #: no rotation
L_FIX = 1 #: static l-projection angle on the pixel detector
FOOTPRINT_FIX = 2 #: static footprint-projection on the pixel detector
class Crystal:
"""
Crystal lattic parameters
"""
def __init__(self, a1: float, a2: float, a3: float,
alpha1: float, alpha2: float, alpha3: float):
self.lattice = (a1, a2, a3, alpha1, alpha2, alpha3)
@property
def lattice(self):
return self.a1, self.a2, self.a3, self.alpha1, self.alpha2, self.alpha3
@lattice.setter
def lattice(self, new_lattice):
self.a1, self.a2, self.a3, self.alpha1, self.alpha2, self.alpha3 = new_lattice
self._compute_B()
def _compute_B(self):
alpha1 = self.alpha1 * RAD
alpha2 = self.alpha2 * RAD
alpha3 = self.alpha3 * RAD
# Calculate the reciprocal lattice parameters
beta2 = acos((cos(alpha1) * cos(alpha3) - cos(alpha2)) / (sin(alpha1) * sin(alpha3)))
beta3 = acos((cos(alpha1) * cos(alpha2) - cos(alpha3)) / (sin(alpha1) * sin(alpha2)))
volume = self.a1 * self.a2 * self.a3 * \
sqrt(1 + 2 * cos(alpha1) * cos(alpha2) * cos(alpha3)
- cos(alpha1) ** 2 - cos(alpha2) ** 2 - cos(alpha3) ** 2)
b1 = 2 * pi * self.a2 * self.a3 * sin(alpha1) / volume
b2 = 2 * pi * self.a1 * self.a3 * sin(alpha2) / volume
b3 = 2 * pi * self.a1 * self.a2 * sin(alpha3) / volume
# Calculate the BMatrix from the direct and reciprocal parameters.
# Reference: Eq.3 of Busing and Levy, Acta Cryst. 22, 457-464 (1967).
self.B = numpy.array([
[b1, b2 * cos(beta3), b3 * cos(beta2)],
[0.0, b2 * sin(beta3), -b3 * sin(beta2) * cos(alpha1)],
[0.0, 0.0, 2 * pi / self.a3],
])
def __str__(self):
return f"a = {self.a1}, b = {self.a2}, c = {self.a3}, alpha = {self.alpha1}, beta = {self.alpha2}, gamma = {self.alpha3} "
class Geometry:
"""
A diffractometer interface to convert between reciprocol space and real space.
"""
@property
def Position(self):
return collections.namedtuple("Position", ','.join(self.axes))
def Qtoa(self,
hphi: Tuple[float, float, float],
beta: Tuple[float, float],
wavelength: float) -> Tuple[Tuple[float, float, float, float, float], Tuple[float, float]]:
"""
Convert from reciprocol space to angles.
"""
if wavelength <= 0:
raise ValueError("Incident wave-length is <=0")
q = TWOPI / wavelength
hphi /= q
return self.hphi_to_ang(hphi, beta)
def atoQ(self,
a: Tuple[float, float, float, float],
wavelength: float) -> Tuple[Tuple[float, float, float], Tuple[float, float]]:
"""
Convert from angles to reciprocal space
:param a: diffractometer angles
:param beta: incidence and emergence angles
:return: (h,k,l), (betaIn, betaOut)
"""
if wavelength <= 0:
raise ValueError("Incident wave-length is <=0")
# calculate corresponding reciprocal space coordinates
hphi = self.ang_to_hphi(a)
# calculate incidence and emergence angles
_, _, l = hphi
BETAIN = a[1]
sbi = sin(BETAIN * RAD)
sbo = l - sbi
if fabs(sbo) > 1:
raise ValueError("|sin(betaOut)| > 1 --> betaOut > 90 deg!")
BETAOUT = asin(sbo) * RAD_1 # betaOut in deg
q = TWOPI / wavelength
hphi *= q
return hphi, (BETAIN, BETAOUT)
def hphi_to_ang(self, hphi: Tuple[float, float, float], beta: Tuple[float, float]):
"""
Convert from orthonormal cartesian coordinates in reciprocal space to angles
"""
raise NotImplementedError()
def ang_to_hphi(self, angles: Tuple[float, float, float, float, float]):
"""
Create normalized momentum transfer Qphi from spectrometer angles.
"""
raise NotImplementedError()
def _locate(self, a, min):
"""
Cut points for angles
"""
if a < min:
a += 360
return a
class GeometryX04V(Geometry):
"""
X04SA SD vertical geometry
"""
axes = ('ov', 'alp', 'delta', 'gam', 'nu')
def __init__(self):
self.mode = Mode.BETAIN_FIX
self.nu_rot = NuMode.NO
def hphi_to_ang(self, hphi, beta):
h, k, l = hphi
BETAIN, BETAOUT = beta
h2 = h * h
k2 = k * k
l2 = l * l
# calculate Y (common for all modes)
Y = -0.5 * (h2 + k2 + l2)
# calculate BETAIN and BETAOUT depending on the mode
match self.mode:
case Mode.BETAIN_FIX:
sbi = sin(BETAIN * RAD)
sbo = l - sbi
if fabs(sbo) > 1:
raise ValueError("|sin(betaOut)| > 1 --> betaOut > 90 deg!")
BETAOUT = asin(sbo) * RAD_1 # betaOut in deg
case Mode.BETAOUT_FIX:
sbo = sin(BETAOUT * RAD)
sbi = l - sbo
if fabs(sbi) > 1:
raise ValueError("|sin(betaIn)| > 1 --> incidence angle > 90 deg!")
BETAIN = asin(sbi) * RAD_1
case Mode.BETAIN_EQ_BETAOUT:
sbi = l / 2
if fabs(sbi) > 1:
raise ValueError("|L/2| > 1")
sbo = sbi
BETAIN = BETAOUT = asin(sbi) * RAD_1
cbi = cos(BETAIN * RAD)
if fabs(cbi) < VERY_SMALL:
raise ValueError("cos(betaIn) = 0 --> angle of incidence is 90 deg!")
# calculate Z and X based on BETAIN and BETAOUT */
sa = sbi
alp = asin(sbi)
ca = cos(alp)
Z = (((Y + 1) * sbi + sbo) / ca)
A = (ca * Y + sa * Z)
A2 = A * A
if (h2 + k2 - A2) < VERY_SMALL:
raise ValueError("X is NaN")
X = sqrt(h2 + k2 - A2)
X2 = X * X
# calculate angles
# gam
if fabs(Z) < VERY_SMALL and fabs(Y+1) < VERY_SMALL:
raise ValueError("gam = atan2(0,0)")
gam = atan2(Z,(Y+1))
# del
sd = X
if fabs(sin(gam)) < VERY_SMALL:
raise ValueError("cos(del) is infinite")
cd = Z / sin(gam)
if fabs(sd) < VERY_SMALL and fabs(cd) < VERY_SMALL:
raise ValueError("del = atan2(0,0)")
delta = atan2(sd,cd)
# ov
sov = (k * X - h * A) / (A2 + X2)
cov = (h * X + k * A) / (A2 + X2)
if fabs(sov) < VERY_SMALL and fabs(cov) < VERY_SMALL:
raise ValueError("ov = atan2(0,0)")
ov = atan2(sov,cov)
# alp, nu
alp = asin(sbi)
nu = numpy.nan # set new nu angle only when nu is allowed to rotate
if self.nu_rot == NuMode.L_FIX:
nu = atan(-tan(gam - alp) * sin(delta))
elif self.nu_rot == NuMode.FOOTPRINT_FIX:
if fabs(sd) < VERY_SMALL and fabs(sin(gam - alp)) < VERY_SMALL:
nu = 0.0
else:
nu = atan2(sd*cos(gam - alp),sin(gam - alp))
GAM = gam * RAD_1
DEL = delta * RAD_1
ALP = alp * RAD_1
OV = ov * RAD_1
NU = nu * RAD_1
# cut points for angles
GAM = self._locate(GAM, -90)
DEL = self._locate(DEL, -90)
ALP = self._locate(ALP, -90)
OV = self._locate(OV, -180)
NU = self._locate(NU, -90)
return self.Position(OV, ALP, DEL, GAM, NU), (BETAIN, BETAOUT)
def ang_to_hphi(self, angles):
"""
Create normalized momentum transfer Qphi from spectrometer angles.
"""
a = numpy.array(angles, dtype=float)
a *= RAD
cov = cos(a[0]); sov = sin(a[0])
ca = cos(a[1]); sa = sin(a[1])
cd = cos(a[2]); sd = sin(a[2])
cg = cos(a[3]); sg = sin(a[3])
X = sd
Y = (cd * cg) - 1
Z = cd * sg
hphi = numpy.zeros(3)
hphi[0] = X * cov - (Y * ca + Z * sa) * sov
hphi[1] = X * sov + (Y * ca + Z * sa) * cov
hphi[2] = Z * ca - Y * sa
return hphi
class GeometryX04H(Geometry):
axes = ('phi', 'oh', 'delta', 'gam', 'nu')
def __init__(self):
self.mode = Mode.BETAIN_FIX
self.nu_rot = NuMode.NO
def hphi_to_ang(self, hphi, beta):
"""
Convert from orthonormal cartesian coordinates in reciprocal space to angles
"""
h, k, l = hphi
BETAIN, BETAOUT = beta
h2 = h * h
k2 = k * k
l2 = l * l
# Eq. 45: calculate X (common for all modes)
Y = -0.5 * (h2 + k2 + l2)
# Eq. 46: calculate BETAIN and BETAOUT depending on the mode
match self.mode:
case Mode.BETAIN_FIX:
sbi = sin(BETAIN * RAD)
sbo = l - sbi
if fabs(sbo) > 1:
raise ValueError("|sin(betaOut)| > 1 --> betaOut > 90 deg!")
BETAOUT = asin(sbo) * RAD_1; # betaOut in deg
case Mode.BETAOUT_FIX:
sbo = sin(BETAOUT * RAD)
sbi = l - sbo
if fabs(sbi) > 1:
raise ValueError("|sin(betaIn)| > 1 --> incidence angle > 90 deg!")
BETAIN = asin(sbi) * RAD_1
case Mode.BETAIN_EQ_BETAOUT:
sbi = l / 2
if fabs(sbi) > 1:
raise ValueError("|L/2| > 1")
sbo = sbi
BETAIN = BETAOUT = asin(sbi) * RAD_1
cbi = cos(BETAIN * RAD)
if fabs(cbi) < VERY_SMALL:
raise ValueError("cos(betaIn) = 0 --> angle of incidence is 90 deg!")
# Eq. 47-48: calculate Z and X based on BETAIN and BETAOUT
soh = sbi
oh = asin(sbi)
coh = cos(oh)
Z = (((Y + 1) * sbi + sbo) / coh)
A = (cbi * Y + sbi * Z)
A2 = A * A
if h2 + k2 - A2 < VERY_SMALL:
raise ValueError("Y is NaN")
X = -sqrt(h2 + k2 - A2)
X2 = X * X
# calculate angles
# Eq. 49: gam
sg = -X
cg = Y + 1
if fabs(X) < VERY_SMALL and fabs(Y+1) < VERY_SMALL:
raise ValueError("gam = atan2(0,0)")
gam = atan2(-X,(Y+1))
# Eq. 50: delta
sd = Z
cd = -X / sin(gam)
if fabs(sd) < VERY_SMALL and fabs(cd) < VERY_SMALL:
raise ValueError("del = atan2(0,0)")
delta = atan2(sd,cd)
# Eq. 51: phi
sp = (h * A - k * X) / (A2 + X2)
cp = (h * X + k * A) / (A2 + X2)
if fabs(sp) < VERY_SMALL and fabs(cp) < VERY_SMALL:
raise ValueError("phi = atan2(0,0)")
phi = atan2(sp,cp)
# Eq. 52: oh
oh = asin(sbi)
# Eq. nu
nu = numpy.nan
if self.nu_rot == NuMode.L_FIX:
if (fabs(-sg * soh) < VERY_SMALL and
fabs(soh * cg * sd + coh * cd) < VERY_SMALL):
nu = 0.0
else:
nu = atan2(-sg * soh , soh * cg * sd + coh * cd)
elif self.nu_rot == NuMode.FOOTPRINT_FIX:
if (fabs(cg * sin(delta - oh)) < VERY_SMALL and
fabs(sg) < VERY_SMALL):
nu = 0.0
else:
nu = atan2(cg*sin(delta - oh),sg)
PHI = phi * RAD_1
OH = oh * RAD_1
GAM = gam * RAD_1
DEL = delta * RAD_1
NU = nu * RAD_1
# cut points for angles
PHI = self._locate(PHI, -180)
OH = self._locate(OH, -90)
DEL = self._locate(DEL, -90)
GAM = self._locate(GAM, -90)
NU = self._locate(NU, -90)
return self.Position(PHI, OH, DEL, GAM, NU), (BETAIN, BETAOUT)
def ang_to_hphi(self, angles):
"""
Create normalized momentum transfer Qphi from spectrometer angles.
"""
a = numpy.array(angles)
a *= RAD
cp = cos(a[0]); sp = sin(a[0])
coh = cos(a[1]); soh = sin(a[1])
cd = cos(a[2]); sd = sin(a[2])
cg = cos(a[3]); sg = sin(a[3])
# Eq. 43
X = -cd * sg
Y = cd * cg -1
Z = sd
# Eq. 44
hphi = numpy.zeros(3)
hphi[0] = cp * X + sp * (coh * Y + soh * Z)
hphi[1] = -sp * X + cp * (coh * Y + soh * Z)
hphi[2] = coh * Z - soh * Y
return hphi
class RecipCalc:
geometry_class: Geometry = None
def __init__(self):
self._geometry: Geometry = self.geometry_class()
self._crystal = Crystal(TWOPI, TWOPI, TWOPI, 90, 90, 90)
self._wavelength = 1
self.reflections = []
self.U = self.UB = self.UBinv = numpy.identity(3)
@property
def mode(self):
return self._geometry.mode
@mode.setter
def mode(self, mode):
self._geometry.mode = mode
@property
def nu_rot(self):
return self._geometry.nu_rot
@nu_rot.setter
def nu_rot(self, nu_rot):
self._geometry.nu_rot = nu_rot
@property
def lattice(self):
return self._crystal.lattice
@lattice.setter
def lattice(self, lattice):
self._crystal.lattice = lattice
self._update_UB()
@property
def wavelength(self):
return self._wavelength
@wavelength.setter
def wavelength(self, wavelength):
self._wavelength = wavelength
def _update_UB(self):
if self.U is not None:
self.UB = self.U @ self._crystal.B
self.UBinv = numpy.linalg.inv(self.UB)
def angles(self, h: float, k: float, l: float, beta: Tuple[float, float]):
"""
Calculate angles from H, K and L.
"""
hh = numpy.zeros(3) # scattering vector in lattice units
hphi = numpy.zeros(3) # scattering vector in orthonormal cartesian coordinates
# in reciprocal space
hh[0] = h
hh[1] = k
hh[2] = l
# Convert from crystal lattice to orthonormal cartesian coordinates
# in reciprocal space by applying the UB matrix to (H K L)
hphi = numpy.matmul(self.UB, hh)
# Angles from HKL
angles = self._geometry.Qtoa(hphi, beta, self._wavelength)
return angles
def hkl(self, angles: Tuple[float, float, float, float]) -> Tuple[Tuple[float, float, float], Tuple[float, float]]:
"""
Calculate H, K and L from angles.
"""
# Convert from angles to orthonormal cartesian coordinates in reciprocal space
hphi, beta = self._geometry.atoQ(angles, self._wavelength)
# Convert from orthonormal cartesian coordinates in reciprocal
# space to the crystal lattice positions by applying the inverse
# of the UB matrix on hphi = (h' k' l')
hh = numpy.matmul(self.UBinv, hphi)
return hh, beta
def add_reflection(self, h: float, k: float, l: float, angles: Tuple[float, float, float, float]):
"""
Add a reflection to the list. If the reflection already exists, replace it with the new angles.
"""
for i in range(len(self.reflections)):
if numpy.allclose(self.reflections[i][:3], (h, k, l)):
self.reflections[i] = (h, k, l, angles)
return
self.reflections.append((h, k, l, angles))
def remove_reflection(self, h: float, k: float, l: float):
"""
Remove a reflection from the list.
"""
for i in range(len(self.reflections)):
if numpy.allclose(self.reflections[i][:3], (h, k, l)):
del self.reflections[i]
def clear_relections(self):
"""
Remove all reflections
"""
self.reflections.clear()
def compute_UB(self):
"""
"""
# Hp = U @ Hc
Hc = numpy.empty(shape=(3,0)) # in crystal system
Hp = numpy.empty(shape=(3,0)) # in phi system
for ref in self.reflections:
h, k, l, angles = ref
Hc = numpy.c_[Hc, self._crystal.B @ [h, k, l]]
Hp = numpy.c_[Hp, self._geometry.atoQ(angles, self._wavelength)[0]]
if len(self.reflections) == 2:
# Busing & Levy, Acta Cryst. (1967). 22, 457
h1c = Hc[:,0]
h2c = Hc[:,1]
u1p = Hp[:,0]
u2p = Hp[:,1]
# Create modified unit vectors t1, t2 and t3 in crystal and phi systems
t1c = h1c
t3c = numpy.cross(h1c, h2c)
t2c = numpy.cross(t3c, t1c)
t1p = u1p
t3p = numpy.cross(u1p, u2p)
t2p = numpy.cross(t3p, t1p)
# ...and nornmalise and check that the reflections used are appropriate
def __normalise(m):
d = numpy.linalg.norm(m, axis=0)
if any(d < 1e-7):
return
return m / d
Tc = __normalise(numpy.column_stack((t1c, t2c, t3c)))
Tp = __normalise(numpy.column_stack((t1p, t2p, t3p)))
U = Tp @ numpy.linalg.inv(Tc)
elif len(self.reflections) == 3: # determined system
U = Hp @ numpy.linalg.inv(Hc)
elif len(self.reflections) > 3: # over-determined system
U = numpy.transpose((numpy.linalg.inv(Hc @ Hc.T) @ Hc ) @ Hp.T) # linear least squares
self.U = U
self._update_UB()
class CalcX04V(RecipCalc):
geometry_class = GeometryX04V
class CalcX04H(RecipCalc):
geometry_class = GeometryX04H

View File

@@ -0,0 +1,91 @@
from ophyd import (
ADComponent as ADCpt,
Device,
DeviceStatus,
CamBase,
DetectorBase,
)
from ophyd_devices.devices.sls_detector import SLSDetector
from ophyd_devices.devices.areadetector.plugins import (
ImagePlugin_V35 as ImagePlugin,
StatsPlugin_V35 as StatsPlugin,
HDF5Plugin_V35 as HDF5Plugin,
ROIPlugin_V35 as ROIPlugin,
)
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from bec_lib import bec_logger
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
class Eiger500KSetup(CustomDetectorMixin):
def __init__(self, *args, parent:Device = None, **kwargs):
super().__init__(*args, parent=parent, **kwargs)
def on_stage(self):
# camera acquisition parameters
self.parent.cam.image_mode.put(1) # Multiple
self.parent.cam.acquire_time.put(self.parent.scaninfo.exp_time)
# self.parent.cam.frames_per_trigger
# file writer
self.parent.hdf.lazy_open.put(1)
self.parent.hdf.num_capture.put(self.parent.scaninfo.num_points)
self.parent.hdf.file_write_mode.put(2) # Stream
self.parent.hdf.capture.put(1, wait=False)
logger.success('XXXX stage %d XXXX')
def on_trigger(self):
self.parent.cam.acquire.put(1, wait=False)
logger.success('XXXX trigger XXXX')
return self.wait_with_status(
[(self.parent.cam.acquire.get, 0)],
self.parent.scaninfo.exp_time + DETECTOR_TIMEOUT,
all_signals=True
)
def on_complete(self):
status = DeviceStatus(self.parent)
success = self.wait_for_signals(
[(self.parent.cam.acquire.get, 0), (self.parent.hdf.capture.get, 0)],
DETECTOR_TIMEOUT,
all_signals=True
)
self.parent.filepath.put(self.parent.hdf.full_file_name.get())
self.publish_file_location(done=True, successful=success)
logger.success('XXXX complete %d XXXX' % success)
if success:
status.set_finished()
else:
status.set_exception(TimeoutError())
return status
def on_stop(self):
logger.success('XXXX stop XXXX')
self.parent.cam.acquire.put(0)
self.parent.hdf.capture.put(0)
def on_unstage(self):
self.parent.hdf.capture.put(0)
logger.success('XXXX unstage XXXX')
class CamBase_V3(CamBase):
pool_max_buffers = None
class Eiger500K(PSIDetectorBase):
"""
"""
custom_prepare_cls = Eiger500KSetup
cam = ADCpt(CamBase_V3, 'cam1:')
#image = ADCpt(ImagePlugin, 'image1:')
#roi1 = ADCpt(ROIPlugin, 'ROI1:')
#roi2 = ADCpt(ROIPlugin, 'ROI2:')
#stats1 = ADCpt(StatsPlugin, 'Stats1:')
#stats2 = ADCpt(StatsPlugin, 'Stats2:')
hdf = ADCpt(HDF5Plugin, 'HDF1:')

View File

@@ -0,0 +1,159 @@
import numpy
from ophyd import PseudoSingle, PseudoPositioner, SoftPositioner, Component as Cpt, EpicsMotor
from ophyd.signal import AttributeSignal
from ophyd.pseudopos import (real_position_argument, pseudo_position_argument)
from .calc import Mode, RecipCalc, CalcX04V, CalcX04H
class Diffractometer(PseudoPositioner):
"""
Diffractometer pseudopositioner
"""
calc_class = None
h = Cpt(PseudoSingle, "", kind="hinted")
k = Cpt(PseudoSingle, "", kind="hinted")
l = Cpt(PseudoSingle, "", kind="hinted")
betaIn = Cpt(PseudoSingle, "", kind="hinted")
betaOut = Cpt(PseudoSingle, "", kind="hinted")
real_axes = Cpt(AttributeSignal, attr='_real_axes', write_access=False)
USER_ACCESS = ['forward', 'inverse', 'real_position', 'get_real_positioners',
'angles_from_hkls',
'freeze', 'unfreeze', 'get_frozen',
'set_mode', 'get_mode',
'set_nu_rot', 'get_nu_rot',
'set_lattice', 'get_lattice',
'set_wavelength', 'get_wavelength',
'get_reflections', 'add_reflection', 'remove_reflection',
'compute_UB', 'get_UB'
]
def __init__(self, prefix, **kwargs):
self.calc: RecipCalc = self.calc_class()
self.frozen = False
self.beta_frozen = [None, None]
super().__init__(
prefix,
**kwargs
)
self._real_axes = self.real_positioners._fields
return
def freeze(self, angle: float | None):
self.frozen = True
match self.calc.mode:
case Mode.BETAIN_FIX:
self.beta_frozen[0] = angle if angle is not None else self.betaIn.position
case Mode.BETAOUT_FIX:
self.beta_frozen[1] = angle if angle is not None else self.betaOut.position
def unfreeze(self):
self.frozen = False
def get_frozen(self):
if self.frozen and self.calc.mode != Mode.BETAIN_EQ_BETAOUT:
return self.beta_frozen[self.calc.mode]
def get_lattice(self):
return self.calc.lattice
def set_lattice(self, lattice):
self.calc.lattice = lattice
def set_mode(self, mode):
self.calc.mode = mode
# unfreeze if a frozen angle has not been saved for this mode,
if self.beta_frozen[mode] is None:
self.unfreeze()
def get_mode(self):
return self.calc.mode
def set_nu_rot(self, nu_rot):
self.calc.nu_rot = nu_rot
def get_nu_rot(self):
return self.calc.nu_rot
def set_wavelength(self, wavelength):
self.calc.wavelength = wavelength
def get_wavelength(self):
return self.calc.wavelength
def get_reflections(self):
return self.calc.reflections
def add_reflection(self, h, k, l, angles):
self.calc.add_reflection(h, k, l, angles)
def remove_reflection(self, h, k, l):
self.calc.remove_reflection(h, k, l)
def compute_UB(self):
self.calc.compute_UB()
def get_UB(self):
return self.calc.UB
def get_real_positioners(self):
return self.RealPosition._fields
def angles_from_hkls(self, hkls):
angles = []
for hkl in hkls:
real_pos = self.forward(hkl)
angles.append(real_pos)
return angles
@pseudo_position_argument
def forward(self, pseudo_pos):
hkl = pseudo_pos[:3]
beta = pseudo_pos[3:]
if self.frozen:
beta = self.beta_frozen
angles, beta = self.calc.angles(*hkl, beta)
if numpy.isnan(angles.nu):
angles = angles._replace(nu=self.nu.position)
return *angles, *beta
@real_position_argument
def inverse(self, real_pos):
# when initialized in bec device server, it does not wait for connection and
# access the position property with real_pos filled with None.
if None in real_pos:
return self.PseudoPosition(numpy.nan, numpy.nan, numpy.nan, numpy.nan, numpy.nan)
hkl, beta = self.calc.hkl(real_pos)
return self.PseudoPosition(*hkl, *beta)
class X04V(Diffractometer):
"""
X04SA diffractometer in vertical geometry
"""
calc_class = CalcX04V
ov = Cpt(EpicsMotor, 'X04SA-ES3-XPS:OMEGAV', name='ov')
alp = Cpt(EpicsMotor, 'X04SA-ES3-XPS:ALPHA', name='alp')
delta = Cpt(EpicsMotor, 'X04SA-ES3-XPS:DELTA', name='delta')
gam = Cpt(EpicsMotor, 'X04SA-ES3-XPS:GAMMA', name='gam')
nu = Cpt(EpicsMotor, 'X04SA-ES3-XPS:NU', name='nu')
class X04H(Diffractometer):
"""
X04SA diffractometer in horizontal geometry
"""
calc_class = CalcX04H
phi = Cpt(EpicsMotor, 'X04SA-ES3-XPS:PHI', name='phi')
oh = Cpt(EpicsMotor, 'X04SA-ES3-XPS:OMEGAH', name='oh')
delta = Cpt(EpicsMotor, 'X04SA-ES3-XPS:DELTA', name='delta')
gam = Cpt(EpicsMotor, 'X04SA-ES3-XPS:GAMMA', name='gam')
nu = Cpt(EpicsMotor, 'X04SA-ES3-XPS:NU', name='nu')

View File

View File

@@ -0,0 +1 @@
from .hkl_scan import HklScan

View File

@@ -0,0 +1,72 @@
"""
SCAN PLUGINS
All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:
- self.initialize # initialize the class if needed
- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
- self._calculate_positions # calculate the positions
- self._set_positions_offset # apply the previously retrieved scan position shift (if needed)
- self._check_limits # tests to ensure the limits won't be reached
- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage # stage all devices for the upcoming acquisiton
- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan
- self.pre_scan # perform additional actions before the scan starts
- self.scan_core # run a loop over all position
- self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments
- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage # unstage all devices that have been staged before
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
import time
import numpy
# from bec_lib.endpoints import MessageEndpoints
from typing import Literal
from bec_lib.devicemanager import DeviceManagerBase
from bec_lib.logger import bec_logger
# from bec_lib import messages
# from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import RequestBase, ScanArgType, ScanBase
logger = bec_logger.logger
class HklScan(ScanBase):
scan_name = 'hkl_scan'
required_kwargs = ['exp_time']
arg_input = {
'device': ScanArgType.DEVICE,
'start': ScanArgType.LIST,
'stop': ScanArgType.LIST,
'points': ScanArgType.INT
}
arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None}
def __init__(self, diffract, start, stop, points, **kwargs):
self.diffract = diffract
self.start = start
self.stop = stop
self.points = points
super().__init__(**kwargs)
self.scan_report_devices = ['h', 'k', 'l'] + self.scan_motors + self.readout_priority['monitored']
def _get_scan_motors(self):
self.scan_motors = self.device_manager.devices[self.diffract].real_axes.get()
def prepare_positions(self):
"""
Override base method to yield from _calculate_position method
"""
yield from self._calculate_positions()
self._optimize_trajectory()
self.num_pos = len(self.positions) * self.burst_at_each_point
yield from self._set_position_offset()
self._check_limits()
def _calculate_positions(self):
hkls = numpy.linspace(self.start, self.stop, self.points).tolist()
self.positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)

View File

74
pyproject.toml Normal file
View File

@@ -0,0 +1,74 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "addams_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = []
[project.optional-dependencies]
dev = [
"black",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"ophyd_devices",
"bec_server",
]
[project.entry-points."bec"]
plugin_bec = "addams_bec"
[project.entry-points."bec.deployment.device_server"]
plugin_ds_startup = "addams_bec.deployments.device_server.startup:run"
[project.entry-points."bec.file_writer"]
plugin_file_writer = "addams_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "addams_bec.scans"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "addams_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "addams_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "addams_bec.bec_widgets.auto_updates:PlotUpdate"
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "addams_bec.bec_widgets.widgets"
[tool.hatch.build.targets.wheel]
include = ["*"]
[tool.isort]
profile = "black"
line_length = 100
multi_line_output = 3
include_trailing_comma = true
[tool.black]
line-length = 100
skip-magic-trailing-comma = true
[tool.pylint.basic]
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs = [
".*scanID.*",
".*RID.*",
".*pointID.*",
".*ID.*",
".*_2D.*",
".*_1D.*",
]

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,72 @@
import numpy
from ophyd import PseudoSingle, SoftPositioner, Component as Cpt
from addams_bec.devices.diffract import X04V, X04H
class SimulatedX04V(X04V):
ov = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
alp = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
delta = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
gam = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
nu = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0, kind="normal")
class SimulatedX04H(X04H):
phi = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
oh = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
delta = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
gam = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
nu = Cpt(SoftPositioner, limits=(-180, 180), init_pos=0., kind="normal")
def test_UB_x04v():
x04v = SimulatedX04V(prefix='', name="x04v")
# Compute UB matrix
x04v.calc.lattice = (5.43, 5.5, 7.71, 90, 90, 90)
x04v.calc.wavelength = 0.71
x04v.calc.add_reflection(1, -1, 2, numpy.array([ 88.2400, 10, 8.7000, 12.3200]))
# Compute UB matrix
x04v.calc.lattice = (5.43, 5.5, 7.71, 90, 90, 90)
x04v.calc.wavelength = 0.71
x04v.calc.add_reflection(1, -1, 2, numpy.array([ 88.2400, 10, 8.7000, 12.3200]))
x04v.calc.add_reflection(2, -2, 0, numpy.array([ 35.3500, 10, 17.3502, 12.4000]))
x04v.calc.add_reflection(2, -2, 4, numpy.array([101.4501, 10, 17.1200, 25.3300]))
x04v.calc.add_reflection(6, -2, 4, numpy.array([ 95.6500, 10, 46.1300, 31.6600]))
x04v.calc.add_reflection(1, -3, 1, numpy.array([ 16.2754, 10, 11.2367, 21.6627]))
x04v.calc.compute_UB()
UBspec = numpy.array([
[0.58641, -0.56745, -0.56980],
[0.99701, 0.35123, 0.34255],
[0.00667, -0.93809, 0.47113],
])
assert(numpy.allclose(UBspec, x04v.calc.UB, atol=1e-4))
x04v.move(h=1)
def test_UB_x04h():
x04h = SimulatedX04H(prefix='', name='x04h')
x04h.calc.lattice = (2.989, 2.989, 2.989, 90, 90, 90)
x04h.calc.wavelength = 1.631
x04h.calc.add_reflection(1, 1, 1, numpy.array([ -1.0000, 5.0000, 30.5758, 50.0038]))
x04h.calc.add_reflection(1, -1, -1, numpy.array([ 89.2000, 5.0000, 30.5758, 50.0038]))
x04h.calc.add_reflection(1, -1, 1, numpy.array([179.1800, 5.0000, 30.5758, 50.0038]))
x04h.calc.compute_UB()
UBspec = numpy.array([
[-2.04607, -2.51605, 2.04855],
[ 0.46587, -1.58340, -0.46982],
[ 2.10211, 0.00000, 0.00000],
])
assert(numpy.allclose(UBspec, x04h.calc.UB, atol=1e-4))
x04h.move(h=1)

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).