7 Commits

Author SHA1 Message Date
4ac8d316fb chore/actions_test
All checks were successful
CI for addams_bec / test (push) Successful in 34s
CI for addams_bec / test (pull_request) Successful in 33s
2025-09-11 15:36:47 +02:00
37bfc1c2a2 Merge branch 'update_copier_template' into 'main'
feat: update repository with copier changes for gitea migration

See merge request bec/addams_bec!4
2025-09-11 15:16:27 +02:00
63ded2bf7d feat: update repository with copier changes for gitea migration 2025-09-11 15:10:02 +02:00
0c3e7acea6 Merge branch 'chore/migrate_to_copier' into 'main'
chore: migrate to new plugin template

See merge request bec/addams_bec!2
2025-06-02 14:03:54 +02:00
e724ff4869 chore: migrate to new plugin template 2025-06-02 13:16:20 +02:00
2e52f6b274 Merge branch 'feature/energy_optimizer' into 'main'
Feature/energy optimizer

See merge request bec/addams_bec!1
2025-05-24 11:26:12 +02:00
dbda93ee12 feat: add EnergyOptimizer class with transition step calculation and tests 2025-05-24 11:24:05 +02:00
27 changed files with 402 additions and 610 deletions

9
.copier-answers.yml Normal file
View File

@@ -0,0 +1,9 @@
# Do not edit this file!
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.2.1
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: addams_bec
widget_plugins_input: []

97
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,97 @@
name: CI for addams_bec
on:
push:
pull_request:
workflow_dispatch:
inputs:
BEC_WIDGETS_BRANCH:
description: "Branch of BEC Widgets to install"
required: false
type: string
default: "main"
BEC_CORE_BRANCH:
description: "Branch of BEC Core to install"
required: false
type: string
default: "main"
OPHYD_DEVICES_BRANCH:
description: "Branch of Ophyd Devices to install"
required: false
type: string
default: "main"
BEC_PLUGIN_REPO_BRANCH:
description: "Branch of the BEC Plugin Repository to install"
required: false
type: string
default: "main"
PYTHON_VERSION:
description: "Python version to use"
required: false
type: string
default: "3.11"
permissions:
pull-requests: write
jobs:
test:
runs-on: ubuntu-latest
env:
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
- name: Checkout BEC Core
uses: actions/checkout@v4
with:
repository: bec/bec
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
path: ./bec
- name: Checkout Ophyd Devices
uses: actions/checkout@v4
with:
repository: bec/ophyd_devices
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
path: ./ophyd_devices
- name: Checkout BEC Widgets
uses: actions/checkout@v4
with:
repository: bec/bec_widgets
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
- name: Checkout BEC Plugin Repository
uses: actions/checkout@v4
with:
repository: bec/addams_bec
ref: "${{ inputs.BEC_PLUGIN_REPO_BRANCH || github.head_ref || github.sha }}"
path: ./addams_bec
- name: Install dependencies
shell: bash
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Install Python dependencies
shell: bash
run: |
pip install uv
uv pip install --system -e ./ophyd_devices
uv pip install --system -e ./bec/bec_lib[dev]
uv pip install --system -e ./bec/bec_ipython_client
uv pip install --system -e ./bec/bec_server[dev]
uv pip install --system -e ./bec_widgets[dev,pyside6]
uv pip install --system -e ./addams_bec
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov=./addams_bec --cov-config=./addams_bec/pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail ./addams_bec/tests/

View File

@@ -1,4 +1,7 @@
include:
- file: /templates/plugin-repo-template.yml
inputs: {name: addams_bec, target: addams_bec}
inputs:
name: addams_bec
target: addams_bec
branch: $CHILD_PIPELINE_BRANCH
project: bec/awi_utils

View File

@@ -1,6 +1,7 @@
BSD 3-Clause License
Copyright (c) 2024, Paul Scherrer Institute
Copyright (c) 2025, Paul Scherrer Institute
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
@@ -25,4 +26,4 @@ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List, Tuple
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase as DeviceManager
class EnergyOptimizer:
def __init__(self, device_manager: DeviceManager):
self.device_manager = device_manager
self.strips = {
"Si": {"energy_range": [5, 10]},
"Rh": {"energy_range": [8, 23]},
"Pt": {"energy_range": [20, 40]},
}
self.overlap_energyies = {"Si": {"Rh": 9}, "Rh": {"Si": 9, "Pt": 22}, "Pt": {"Rh": 22}}
@staticmethod
def get_transition_steps(start_energy: float, target_energy: float) -> List[Tuple[float, str]]:
"""
Get the required steps to transition from one energy to another.
Args:
start_energy: The starting energy in keV.
target_energy: The target energy in keV.
Returns:
A list of tuples containing the energy and the strip name for each step.
"""
strips = {"Si": (5, 10), "Rh": (8, 23), "Pt": (20, 40)}
overlap_energyies = {"Si": {"Rh": 9}, "Rh": {"Si": 9, "Pt": 22}, "Pt": {"Rh": 22}}
def get_strip(energy: float) -> str:
for strip, (low, high) in strips.items():
if low <= energy <= high:
return strip
return ""
def find_overlap(from_strip: str, to_strip: str) -> float:
return overlap_energyies[from_strip][to_strip]
path = []
if get_strip(start_energy) == "":
raise ValueError("Start energy is out of range for available strips")
if not any(low <= target_energy <= high for low, high in strips.values()):
raise ValueError("End energy is out of range for available strips")
current_energy = start_energy
# TODO: this should be replaced with a readout from the PV
current_strip = get_strip(current_energy)
# if the target energy is covered by the current strip, return the path
if strips[current_strip][0] <= target_energy <= strips[current_strip][1]:
return [(target_energy, current_strip)]
target_strip = get_strip(target_energy)
available_strips = list(strips.keys())
current_index = available_strips.index(current_strip)
target_index = available_strips.index(target_strip)
step = 1 if target_index > current_index else -1
for i in range(current_index, target_index, step):
next_strip = available_strips[i + step]
overlap_energy = find_overlap(available_strips[i], next_strip)
path.append((overlap_energy, next_strip))
current_strip = next_strip
path.append((target_energy, target_strip))
return path
if __name__ == "__main__": # pragma: no cover
from unittest.mock import MagicMock
device_manager = MagicMock()
optimizer = EnergyOptimizer(device_manager)
steps = optimizer.get_transition_steps(30, 20)
print(steps)

View File

@@ -1,10 +1,14 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
is started. It can be used to add additional command line arguments.
"""
import os
from bec_lib.service_config import ServiceConfig
import addams_bec
def extend_command_line_args(parser):
"""
@@ -15,9 +19,13 @@ def extend_command_line_args(parser):
return parser
# def get_config() -> ServiceConfig:
# """
# Create and return the service configuration.
# """
# return ServiceConfig(redis={"host": "localhost", "port": 6379})
def get_config() -> ServiceConfig:
"""
Create and return the ServiceConfig for the plugin repository
"""
deployment_path = os.path.dirname(os.path.dirname(os.path.dirname(addams_bec.__file__)))
files = os.listdir(deployment_path)
if "bec_config.yaml" in files:
return ServiceConfig(config_path=os.path.join(deployment_path, "bec_config.yaml"))
else:
return ServiceConfig(redis={"host": "localhost", "port": 6379})

View File

@@ -1,5 +1,3 @@
import os
from ophyd import (
ADComponent as ADCpt,
Device,
@@ -53,14 +51,6 @@ class Eiger500KSetup(CustomDetectorMixin):
self.parent.cam.acquire.put(1, wait=False) # arm
# file writer
# TODO: enable this when new storage path is consistent on bec server and consoles.
# self.parent.filepath.set(
# self.parent.filewriter.compile_full_filename(f"{self.parent.name}")
# ).wait()
# file_path, file_name = os.path.split(self.parent.filepath.get())
# self.parent.hdf.file_path.put(file_path)
# self.parent.hdf.file_name.put(file_name)
# self.parent.hdf.file_template.put("%s%s")
self.parent.hdf.lazy_open.put(1)
self.parent.hdf.num_capture.put(num_points)
self.parent.hdf.file_write_mode.put(2) # Stream
@@ -109,8 +99,7 @@ class Eiger500KSetup(CustomDetectorMixin):
# publish timeseries data
metadata = self.parent.scaninfo.scan_msg.metadata
metadata.update({"async_update": "append", "max_shape": [None, None]})
metadata.update({"async_update": "append", "num_lines": self.parent.roistat.ts_current_point.get()})
msg = messages.DeviceMessage(
signals={
self.parent.roistat.roi1.name_.get(): {

View File

@@ -143,11 +143,11 @@ class ProfileMoveAxisXPS(ProfileMoveAxis):
class ProfileMoveXPSX04SA(ProfileMoveControllerXPS):
ov = Cpt(ProfileMoveAxisXPS, '', motor='1')
xv = Cpt(ProfileMoveAxisXPS, '', motor='2')
oh = Cpt(ProfileMoveAxisXPS, '', motor='3')
phi = Cpt(ProfileMoveAxisXPS, '', motor='4')
nu = Cpt(ProfileMoveAxisXPS, '', motor='5')
alp = Cpt(ProfileMoveAxisXPS, '', motor='6')
gam = Cpt(ProfileMoveAxisXPS, '', motor='7')
delta = Cpt(ProfileMoveAxisXPS, '', motor='8')
alp = Cpt(ProfileMoveAxisXPS, '', motor='1')
delta = Cpt(ProfileMoveAxisXPS, '', motor='2')
gam = Cpt(ProfileMoveAxisXPS, '', motor='3')
ov = Cpt(ProfileMoveAxisXPS, '', motor='4')
xv = Cpt(ProfileMoveAxisXPS, '', motor='5')
phi = Cpt(ProfileMoveAxisXPS, '', motor='6')
oh = Cpt(ProfileMoveAxisXPS, '', motor='7')
nu = Cpt(ProfileMoveAxisXPS, '', motor='8')

View File

@@ -0,0 +1,6 @@
# Macros
This directory is intended to store macros which will be loaded automatically when starting BEC.
Macros are small functions to make repetitive tasks easier. Functions defined in python files in this directory will be accessible from the BEC console.
Please do not put any code outside of function definitions here. If you wish for code to be automatically run when starting BEC, see the startup script at addams_bec/bec_ipython_client/startup/post_startup.py
For a guide on writing macros, please see: https://bec.readthedocs.io/en/latest/user/command_line_interface.html#how-to-write-a-macro

View File

View File

@@ -1,4 +1,2 @@
from .hkl_scan import HklScan
from .fly_scan import HklFlyScan
from .mesh_scan import HklMeshScan, HklMeshFlyScan
from .profilemove_scan import LineProfileMove, GridProfileMove
from .hkl_scan import HklScan

View File

@@ -1,4 +1,3 @@
import numpy
from bec_lib import messages
@@ -53,7 +52,7 @@ class HklFlyScan(AsyncFlyScanBase):
self._optimize_trajectory()
self.num_pos = len(self.positions) * self.burst_at_each_point
yield from self._set_position_offset()
self._check_limits()
self._check_limits(
def _calculate_positions(self):
hkls = numpy.linspace(self.start, self.stop, self.points).tolist()
@@ -64,7 +63,6 @@ class HklFlyScan(AsyncFlyScanBase):
self.positions.append(position[:-2])
def scan_core(self):
all_motors = self.device_manager.devices[self.controller].axes.get()
scan_motors = self.device_manager.devices[self.diffract].real_axes.get()
hkls = numpy.linspace(self.start, self.stop, self.points).tolist()
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
@@ -81,15 +79,11 @@ class HklFlyScan(AsyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [total_time, total_time])
for axis_name in all_motors:
if axis_name not in scan_motors:
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
else:
index = scan_motors.index(axis_name)
yield from self.stubs.send_rpc_and_wait(self.controller,
f'{axis_name}.positions.put',
(positions[0, index], positions[-1, index]))
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
for index, axis_name in enumerate(scan_motors):
yield from self.stubs.send_rpc_and_wait(self.controller,
f'{axis_name}.positions.put',
(positions[0, index], positions[-1, index]))
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
else:
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', num_points)
@@ -99,15 +93,11 @@ class HklFlyScan(AsyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 0)
yield from self.stubs.send_rpc_and_wait(self.controller, 'fixed_time.put', self.exp_time)
for axis_name in all_motors:
if axis_name not in scan_motors:
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
else:
index = scan_motors.index(axis_name)
yield from self.stubs.send_rpc_and_wait(self.controller,
f'{axis_name}.positions.put',
(positions[0, index], positions[-1, index]))
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
for index, axis_name in enumerate(scan_motors):
yield from self.stubs.send_rpc_and_wait(self.controller,
f'{axis_name}.positions.put',
positions[:, index])
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
@@ -145,7 +135,7 @@ class HklFlyScan(AsyncFlyScanBase):
logger.success(f'{self.scan_name} finished')
def _publish_readbacks(self, device, readbacks):
metadata = {"async_update": "append", "max_shape": [None, None]}
metadata = {"async_update": "append", "num_lines": len(readbacks)}
msg = messages.DeviceMessage(
signals={device: {'value': readbacks} }, metadata=metadata
)

View File

@@ -1,266 +0,0 @@
import itertools
import numpy
# from bec_lib.endpoints import MessageEndpoints
from typing import Literal
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.devicemanager import DeviceManagerBase
from bec_lib.logger import bec_logger
# from bec_lib import messages
# from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import ScanAbortion, ScanArgType, ScanBase, AsyncFlyScanBase
logger = bec_logger.logger
class HklMeshScan(ScanBase):
scan_name = 'hklmesh_scan'
arg_input = {
'index': ScanArgType.DEVICE,
'start': ScanArgType.FLOAT,
'stop': ScanArgType.FLOAT,
'points': ScanArgType.INT
}
arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": 2}
required_kwargs = ['diffract', 'exp_time']
def __init__(self, *args, diffract: str, **kwargs):
self.diffract = diffract
super().__init__(**kwargs)
if any(m not in ['h', 'k', 'l'] for m in self.caller_args):
raise ValueError("Invalid name. Must be 'h', 'k', or 'l'.")
if len(self.caller_args) != 2:
raise ValueError("Only 2 names can be given.")
self.scan_report_devices = ['h', 'k', 'l'] + self.scan_motors + self.readout_priority['monitored']
def update_scan_motors(self):
self.scan_motors = self.device_manager.devices[self.diffract].real_axes.get()
def prepare_positions(self):
"""
Override base method to yield from _calculate_position method
"""
yield from self._calculate_positions()
self._optimize_trajectory()
self.num_pos = len(self.positions) * self.burst_at_each_point
yield from self._set_position_offset()
self._check_limits()
def _calculate_positions(self):
inner_name, outer_name = [x for x in self.caller_args if x in ['h','k','l']]
fixed_name = [x for x in ['h', 'k', 'l'] if x not in self.caller_args][0]
fixed_value = yield from self.stubs.send_rpc_and_wait(self.diffract, f'{fixed_name}.position')
print(f"{inner_name=}, {outer_name=}, {fixed_name=}")
inner_ind, outer_ind, fixed_ind = (['h', 'k', 'l'].index(x) for x in [inner_name, outer_name, fixed_name])
hkls = []
outer_start, outer_stop, outer_points = self.caller_args[outer_name]
inner_start, inner_stop, inner_points = self.caller_args[inner_name]
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
for i, outer in enumerate(outer_vect):
if i % 2 == 0:
inner_vect = numpy.linspace(inner_start, inner_stop, inner_points, dtype=float)
else:
inner_vect = numpy.linspace(inner_stop, inner_start, inner_points, dtype=float)
for inner in inner_vect:
hkl = [0, 0, 0]
hkl[inner_ind] = inner
hkl[outer_ind] = outer
hkl[fixed_ind] = fixed_value
hkls.append(hkl)
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
# the last two positions 'betaIn' and 'betaOut' are not real motors
self.positions = []
for position in positions:
self.positions.append(position[:-2])
class HklMeshFlyScan(AsyncFlyScanBase):
scan_name = 'hklmesh_flyscan'
scan_type = 'fly'
arg_input = {
'index': ScanArgType.DEVICE,
'start': ScanArgType.FLOAT,
'stop': ScanArgType.FLOAT,
'points': ScanArgType.INT
}
arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": 2}
required_kwargs = ['diffract', 'controller']
use_scan_progress_report = False
def __init__(self, *args, diffract: str, controller: str, **kwargs):
self.diffract = diffract
self.controller = controller
super().__init__(**kwargs)
if any(m not in ['h', 'k', 'l'] for m in self.caller_args):
raise ValueError("Invalid name. Must be 'h', 'k', or 'l'.")
if len(self.caller_args) != 2:
raise ValueError("Only 2 names can be given.")
self.scan_report_devices = ['h', 'k', 'l'] + self.scan_motors + self.readout_priority['monitored']
@property
def monitor_sync(self):
return self.diffract
def update_scan_motors(self):
self.scan_motors = self.device_manager.devices[self.diffract].real_axes.get()
def update_readout_priority(self):
self.readout_priority['async'].extend(['h', 'k', 'l'])
self.readout_priority['async'].extend(self.scan_motors)
def prepare_positions(self):
"""
Override base method to yield from _calculate_position method
"""
yield from self._calculate_positions()
self._optimize_trajectory()
self.num_pos = len(self.positions) * self.burst_at_each_point
yield from self._set_position_offset()
self._check_limits()
def _calculate_positions(self):
inner_name, outer_name = [x for x in self.caller_args if x in ['h','k','l']]
fixed_name = [x for x in ['h', 'k', 'l'] if x not in self.caller_args][0]
fixed_value = yield from self.stubs.send_rpc_and_wait(self.diffract, f'{fixed_name}.position')
inner_ind, outer_ind, fixed_ind = (['h', 'k', 'l'].index(x) for x in [inner_name, outer_name, fixed_name])
hkls = []
outer_start, outer_stop, outer_points = self.caller_args[outer_name]
inner_start, inner_stop, inner_points = self.caller_args[inner_name]
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
for i, outer in enumerate(outer_vect):
if i % 2 == 0:
inner_vect = numpy.linspace(inner_start, inner_stop, inner_points, dtype=float)
else:
inner_vect = numpy.linspace(inner_stop, inner_start, inner_points, dtype=float)
for inner in inner_vect:
hkl = [0, 0, 0]
hkl[inner_ind] = inner
hkl[outer_ind] = outer
hkl[fixed_ind] = fixed_value
hkls.append(hkl)
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
# the last two positions 'betaIn' and 'betaOut' are not real motors
self.positions = []
for position in positions:
self.positions.append(position[:-2])
def scan_core(self):
all_motors = self.device_manager.devices[self.controller].axes.get()
inner_name, outer_name = [x for x in self.caller_args if x in ['h','k','l']]
fixed_name = [x for x in ['h', 'k', 'l'] if x not in self.caller_args][0]
fixed_value = yield from self.stubs.send_rpc_and_wait(self.diffract, f'{fixed_name}.position')
inner_ind, outer_ind, fixed_ind = (['h', 'k', 'l'].index(x) for x in [inner_name, outer_name, fixed_name])
num_pos = 0
hkls = []
outer_start, outer_stop, outer_points = self.caller_args[outer_name]
inner_start, inner_stop, inner_points = self.caller_args[inner_name]
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
for i, outer in enumerate(outer_vect):
if i % 2 == 0:
start, stop = inner_start, inner_stop
else:
start, stop = inner_stop, inner_start
hkls = [[0, 0, 0], [0, 0, 0]]
hkls[0][inner_ind] = start
hkls[0][outer_ind] = outer
hkls[0][fixed_ind] = fixed_value
hkls[1][inner_ind] = stop
hkls[1][outer_ind] = outer
hkls[1][fixed_ind] = fixed_value
positions = yield from self.stubs.send_rpc_and_wait(self.diffract, 'angles_from_hkls', hkls)
# Move motors to start position, the last two positions 'betaIn' and 'betaOut' are not real motors
yield from self.stubs.set(device=self.scan_motors, value=positions[0][:-2])
inner_time = inner_points * self.exp_time
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', 2)
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_pulses.put', inner_points)
yield from self.stubs.send_rpc_and_wait(self.controller, 'start_pulses.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'end_pulses.put', 2)
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
print(f"{inner_time=} {inner_points=} {self.exp_time=}")
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [inner_time, inner_time])
for axis_name in all_motors:
if axis_name not in self.scan_motors:
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
else:
index = self.scan_motors.index(axis_name)
yield from self.stubs.send_rpc_and_wait(self.controller,
f'{axis_name}.positions.put',
(positions[0][index], positions[-1][index]))
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
if build_status != 1:
raise ScanAbortion('Profile build failed')
yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_profile')
execute_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_status.get')
if execute_status != 1:
raise ScanAbortion('Profile execute failed')
yield from self.stubs.send_rpc_and_wait(self.controller, 'readback_profile')
readback_status = yield from self.stubs.send_rpc_and_wait(self.controller,'readback_status.get')
if readback_status != 1:
raise ScanAbortion('Profile readback failed')
angle_readbacks = []
for index, axis_name in enumerate(self.scan_motors):
readbacks = yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.readbacks.get')
self._publish_readbacks(axis_name, readbacks)
angle_readbacks.append(readbacks)
# motor readbacks are aranged column-wise
angle_readbacks = [list(x) for x in zip(*angle_readbacks)]
hkls = yield from self.stubs.send_rpc_and_wait(self.diffract, 'hkls_from_angles', angle_readbacks)
hkls = numpy.array(hkls)
for index, name in enumerate(['h', 'k', 'l', 'betaIn', 'betaOut']):
self._publish_readbacks(name, hkls[:, index])
# motor readbacks have more points than generated pulses
num_pos += len(angle_readbacks)
self.num_pos = num_pos
logger.success(f'{self.scan_name} finished')
def _publish_readbacks(self, device, readbacks):
metadata = {"async_update": "append", "max_shape": [None, None]}
msg = messages.DeviceMessage(
signals={device: {'value': readbacks} }, metadata=metadata
)
self.stubs.connector.xadd(
topic=MessageEndpoints.device_async_readback(
scan_id=self.scan_id, device=device
),
msg_dict={"data": msg},
expire=1800,
)

View File

@@ -0,0 +1,12 @@
# from .metadata_schema_template import ExampleSchema
METADATA_SCHEMA_REGISTRY = {
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
# "example_scan": ExampleSchema
}
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None

View File

@@ -0,0 +1,34 @@
# # By inheriting from BasicScanMetadata you can define a schema by which metadata
# # supplied to a scan must be validated.
# # This schema is a Pydantic model: https://docs.pydantic.dev/latest/concepts/models/
# # but by default it will still allow you to add any arbitrary information to it.
# # That is to say, when you run a scan with which such a model has been associated in the
# # metadata_schema_registry, you can supply any python dictionary with strings as keys
# # and built-in python types (strings, integers, floats) as values, and these will be
# # added to the experiment metadata, but it *must* contain the keys and values of the
# # types defined in the schema class.
# #
# #
# # For example, say that you would like to enforce recording information about sample
# # pretreatment, you could define the following:
# #
#
# from bec_lib.metadata_schema import BasicScanMetadata
#
#
# class ExampleSchema(BasicScanMetadata):
# treatment_description: str
# treatment_temperature_k: int
#
#
# # If this was used according to the example in metadata_schema_registry.py,
# # then when calling the scan, the user would need to write something like:
# >>> scans.example_scan(
# >>> motor,
# >>> 1,
# >>> 2,
# >>> 3,
# >>> metadata={"treatment_description": "oven overnight", "treatment_temperature_k": 575},
# >>> )
#
# # And the additional metadata would be saved in the HDF5 file created for the scan.

View File

@@ -1,233 +0,0 @@
import numpy
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.devicemanager import DeviceManagerBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import ScanAbortion, ScanArgType, ScanBase, AsyncFlyScanBase
logger = bec_logger.logger
class LineProfileMove(AsyncFlyScanBase):
scan_name = 'line_profilemove'
scan_type = 'fly'
arg_input = {
'motor': ScanArgType.DEVICE,
'start': ScanArgType.FLOAT,
'stop': ScanArgType.FLOAT,
}
arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None}
required_kwargs = ['controller', 'points']
use_scan_progress_report = False
def __init__(self, *args, controller: str, points: int, **kwargs):
self.controller = controller
self.points = self.num_pos = points
super().__init__(**kwargs)
self.all_motors = self.device_manager.devices[self.controller].axes.get()
if not all(m in self.all_motors for m in self.scan_motors):
raise ValueError(f"Scan motors {self.scan_motors} not in {self.all_motors}")
def update_readout_priority(self):
self.readout_priority['async'].extend(self.scan_motors)
def update_scan_motors(self):
ScanBase.update_scan_motors(self)
def prepare_positions(self):
"""prepare the positions for the scan"""
self._calculate_positions()
self._optimize_trajectory()
self.num_pos = len(self.positions) * self.burst_at_each_point
yield from self._set_position_offset()
self._check_limits()
def _calculate_positions(self):
axis = []
for _, val in self.caller_args.items():
ax_pos = numpy.linspace(val[0], val[1], self.points, dtype=float)
axis.append(ax_pos)
self.positions = numpy.array(list(zip(*axis)), dtype=float)
def scan_core(self):
total_time = self.exp_time * self.points
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', 2)
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_pulses.put', self.points)
yield from self.stubs.send_rpc_and_wait(self.controller, 'start_pulses.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'end_pulses.put', 2)
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [total_time, total_time])
for axis_name in self.all_motors:
if axis_name not in self.scan_motors:
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
else:
index = self.scan_motors.index(axis_name)
yield from self.stubs.send_rpc_and_wait(self.controller,
f'{axis_name}.positions.put',
(self.caller_args[axis_name][0], self.caller_args[axis_name][1]))
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
if build_status != 1:
raise ScanAbortion('Profile build failed')
yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_profile')
execute_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_status.get')
if execute_status != 1:
raise ScanAbortion('Profile execute failed')
yield from self.stubs.send_rpc_and_wait(self.controller, 'readback_profile')
readback_status = yield from self.stubs.send_rpc_and_wait(self.controller,'readback_status.get')
if readback_status != 1:
raise ScanAbortion('Profile readback failed')
for index, axis_name in enumerate(self.scan_motors):
readbacks = yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.readbacks.get')
self._publish_readbacks(axis_name, readbacks)
logger.success(f'{self.scan_name} finished')
def _publish_readbacks(self, device, readbacks):
metadata = {"async_update": "append", "max_shape": [None, None]}
msg = messages.DeviceMessage(
signals={device: {'value': readbacks} }, metadata=metadata
)
self.stubs.connector.xadd(
topic=MessageEndpoints.device_async_readback(
scan_id=self.scan_id, device=device
),
msg_dict={"data": msg},
expire=1800,
)
class GridProfileMove(AsyncFlyScanBase):
scan_name = 'grid_profilemove'
scan_type = 'fly'
arg_input = {
'motor': ScanArgType.DEVICE,
'start': ScanArgType.FLOAT,
'stop': ScanArgType.FLOAT,
'points': ScanArgType.INT,
}
arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": 2}
required_kwargs = ['controller']
use_scan_progress_report = False
def __init__(self, *args, controller: str, **kwargs):
self.controller = controller
super().__init__(**kwargs)
self.all_motors = self.device_manager.devices[self.controller].axes.get()
if not all(m in self.all_motors for m in self.scan_motors):
raise ValueError(f"Scan motors {self.scan_motors} not in {self.all_motors}")
def update_readout_priority(self):
self.readout_priority['async'].extend(self.scan_motors)
def update_scan_motors(self):
ScanBase.update_scan_motors(self)
def prepare_positions(self):
"""prepare the positions for the scan"""
self._calculate_positions()
self._optimize_trajectory()
self.num_pos = len(self.positions) * self.burst_at_each_point
yield from self._set_position_offset()
self._check_limits()
def _calculate_positions(self):
inner_motor, outer_motor = self.scan_motors
outer_start, outer_stop, outer_points = self.caller_args[outer_motor]
inner_start, inner_stop, inner_points = self.caller_args[inner_motor]
self.positions = []
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
for i, outer in enumerate(outer_vect):
if i % 2 == 0:
inner_vect = numpy.linspace(inner_start, inner_stop, inner_points, dtype=float)
else:
inner_vect = numpy.linspace(inner_stop, inner_start, inner_points, dtype=float)
for inner in inner_vect:
self.positions.append((inner, outer))
def scan_core(self):
inner_motor, outer_motor = self.scan_motors
outer_start, outer_stop, outer_points = self.caller_args[outer_motor]
inner_start, inner_stop, inner_points = self.caller_args[inner_motor]
num_pos = 0
outer_vect = numpy.linspace(outer_start, outer_stop, outer_points, dtype=float)
for i, outer in enumerate(outer_vect):
if i % 2 == 0:
start, stop = inner_start, inner_stop
else:
start, stop = inner_stop, inner_start
# Move outer motor to start position
yield from self.stubs.set(device=outer_motor, value=outer)
outer_readback = yield from self.stubs.send_rpc_and_wait(outer_motor, "position")
inner_time = self.exp_time * inner_points
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_points.put', 2)
yield from self.stubs.send_rpc_and_wait(self.controller, 'num_pulses.put', inner_points)
yield from self.stubs.send_rpc_and_wait(self.controller, 'start_pulses.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'end_pulses.put', 2)
yield from self.stubs.send_rpc_and_wait(self.controller, 'time_mode.put', 1)
yield from self.stubs.send_rpc_and_wait(self.controller, 'times.put', [inner_time, inner_time])
for axis_name in self.all_motors:
if axis_name == inner_motor:
yield from self.stubs.send_rpc_and_wait(self.controller,
f'{axis_name}.positions.put',
(start, stop))
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 1)
else:
yield from self.stubs.send_rpc_and_wait(self.controller, f'{axis_name}.use_axis.put', 0)
yield from self.stubs.send_rpc_and_wait(self.controller, 'build_profile')
build_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'build_status.get')
if build_status != 1:
raise ScanAbortion('Profile build failed')
yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_profile')
execute_status = yield from self.stubs.send_rpc_and_wait(self.controller, 'execute_status.get')
if execute_status != 1:
raise ScanAbortion('Profile execute failed')
yield from self.stubs.send_rpc_and_wait(self.controller, 'readback_profile')
readback_status = yield from self.stubs.send_rpc_and_wait(self.controller,'readback_status.get')
if readback_status != 1:
raise ScanAbortion('Profile readback failed')
readbacks = yield from self.stubs.send_rpc_and_wait(self.controller, f'{inner_motor}.readbacks.get')
self._publish_readbacks(inner_motor, readbacks)
readbacks = [outer_readback] * len(readbacks)
self._publish_readbacks(outer_motor, readbacks)
# motor readbacks have more points than generated pulses
num_pos += len(readbacks)
self.num_pos = num_pos
logger.success(f'{self.scan_name} finished')
def _publish_readbacks(self, device, readbacks):
metadata = {"async_update": "append", "max_shape": [None]}
msg = messages.DeviceMessage(
signals={device: {'value': readbacks} }, metadata=metadata
)
self.stubs.connector.xadd(
topic=MessageEndpoints.device_async_readback(
scan_id=self.scan_id, device=device
),
msg_dict={"data": msg},
expire=1800,
)

1
bin/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
# Add anything you don't want to check in to git, e.g. very large files

View File

@@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "addams_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
description = "A plugin repository for BEC"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
@@ -17,6 +17,7 @@ dependencies = []
[project.optional-dependencies]
dev = [
"black",
"copier",
"isort",
"coverage",
"pylint",
@@ -38,12 +39,15 @@ plugin_file_writer = "addams_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "addams_bec.scans"
[project.entry-points."bec.scans.metadata_schema"]
plugin_metadata_schema = "addams_bec.scans.metadata_schema"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "addams_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "addams_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "addams_bec.bec_widgets.auto_updates:PlotUpdate"
plugin_widgets_update = "addams_bec.bec_widgets.auto_updates"
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "addams_bec.bec_widgets.widgets"

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,38 @@
from unittest import mock
import pytest
from addams_bec.bec_ipython_client.plugins.energy_optimizer.addams_energy_optimizer import (
EnergyOptimizer,
)
@pytest.fixture
def optimizer():
dm = mock.MagicMock()
yield EnergyOptimizer(dm)
@pytest.mark.parametrize(
"start_energy, target_energy, expected",
[
(5, 10, [(10, "Si")]),
(5, 20, [(9, "Rh"), (20, "Rh")]),
(5, 40, [(9, "Rh"), (22, "Pt"), (40, "Pt")]),
(5, 5, [(5, "Si")]),
(5, 4, ValueError),
(5, 41, ValueError),
(2, 8, ValueError),
(18, 40, [(22, "Pt"), (40, "Pt")]),
(18, 5, [(9, "Si"), (5, "Si")]),
(18, 10, [(10, "Rh")]),
(18, 20, [(20, "Rh")]),
(25, 7, [(22, "Rh"), (9, "Si"), (7, "Si")]),
],
)
def test_get_transition_steps(optimizer, start_energy, target_energy, expected):
if expected == ValueError:
with pytest.raises(ValueError):
optimizer.get_transition_steps(start_energy, target_energy)
else:
assert optimizer.get_transition_steps(start_energy, target_energy) == expected

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).