48 Commits

Author SHA1 Message Date
aacerbo
4ef1b3e84b adding placeholders for EPICS AD devices 2024-08-07 21:12:02 +02:00
ci_update_bot
adb94e2b75 docs: Update device list 2024-08-06 08:20:20 +00:00
gac-x01da (Resp. Clark Adam Hugh)
79f5d42ffc fix: add specific ScanControlLoadMessages 2024-08-06 10:14:40 +02:00
gac-x01da (Resp. Clark Adam Hugh)
5296c70ebb fix: removed stopped bug at beamline with device 2024-08-05 13:49:45 +02:00
d0c81d20e7 fix: improve error handling in wait_for_status and _move_and_finish 2024-07-29 10:45:51 +02:00
ce3718dcf6 refactor: cleanup for device and tests 2024-07-27 10:09:31 +02:00
gac-x01da (Resp. Clark Adam Hugh)
59675e0038 fix: refactoring at the beamline 2024-07-26 17:11:33 +02:00
29076425c4 test: review and complement tests 2024-07-26 17:11:33 +02:00
d26834c5d4 refactor: mono scans and device refactoring, added dataclass for scan args 2024-07-26 17:11:33 +02:00
gac-x01da (Resp. Clark Adam Hugh)
7d3bd609d1 feat: first draft for XRD scan 2024-07-26 17:11:33 +02:00
bc2b884413 fix: fixed readoutpriority in config 2024-07-24 15:14:24 +02:00
78857a0a84 fix: fixed import of devices for x01da_database.yaml 2024-07-24 15:11:45 +02:00
de2711ba48 feat: added exp hutch configs 2024-07-24 15:09:10 +02:00
ci_update_bot
82da8b2555 docs: Update device list 2024-07-19 09:18:15 +00:00
gac-x01da (Resp. Clark Adam Hugh)
850c07a6f9 fix: tested and fixed move with device at Debye 2024-07-19 10:53:34 +02:00
4db4acbcc8 fix: remove stop_scan from stop 2024-07-19 10:22:17 +02:00
9cf9e640e5 test: add tests and review scan and device 2024-07-19 10:20:09 +02:00
5201bbf480 refactor: refactored methods, changed PV kinds and added more info to docstrings 2024-07-19 10:20:09 +02:00
gac-x01da (Resp. Clark Adam Hugh)
8eab8e0c6a refactor: refactored motor and scan at Debye 2024-07-19 10:20:09 +02:00
b441f5db65 fix: adapt scaninfo to pass on scanning parameters 2024-07-19 10:20:09 +02:00
422b789892 refactor: review scan draft, and refactor ophyd positioner, add tests for initial scan draft 2024-07-19 10:20:09 +02:00
a693ac7778 feat: oscillation scan on the mono bragg motor 2024-07-19 10:20:09 +02:00
ci_update_bot
8468435151 docs: Update device list 2024-07-17 08:26:13 +00:00
1eb1f2cb34 fix: improve kickoff/complete logic, add test for complete 2024-07-16 17:36:41 +02:00
434e863bd2 test: add tests for setting simple XAS scans 2024-07-16 17:24:15 +02:00
0298604945 refactor: renamed readback, setpoint to comply better with ophyd.PositionerBase 2024-07-16 17:23:01 +02:00
gac-x01da (Resp. Clark Adam Hugh)
6c99e40a7b fix: fixed mo1_bragg functionality at the beamline, changed .set to .put for stability 2024-07-16 14:51:02 +02:00
734f7e7133 refactor: small adjustments to exception handling 2024-07-15 16:07:57 +02:00
15baae2c0c fix: add motor to debye config 2024-07-12 14:24:04 +02:00
6a8acff77d tests: add tests for Mo1Bragg 2024-07-12 14:23:56 +02:00
0dd8d7ae83 refactor: refactoring of mo1_bragg, add move method 2024-07-12 11:56:37 +02:00
gac-x01da (Resp. Clark Adam Hugh)
ad0db78e45 feat: add first draft for mo1bragg ophyd positioner 2024-07-11 15:43:04 +02:00
ci_update_bot
abc5271a02 docs: Update device list 2024-06-11 08:42:01 +00:00
61df1b4cf2 fix(startup): fixed service startup for clients 2024-06-11 10:37:51 +02:00
ci_update_bot
4231014463 docs: Update device list 2024-05-16 15:45:20 +00:00
143a39f664 fix: adapt imports to refactoring in bec_lib 2024-05-15 14:58:26 +02:00
3a64392bcb ci: added child pipeline var 2024-05-13 16:38:58 +02:00
9f80946289 ci: move ci to awi-utils 2024-05-07 14:42:35 +02:00
1c9a971a5a Merge branch 'fix/signal_data' into 'main'
fix: fixed parsing of ophyd-compatible signal data

See merge request bec/debye_bec!12
2024-05-07 14:41:07 +02:00
8fa72385ae fix: fixed parsing of ophyd-compatible signal data 2024-05-07 14:39:15 +02:00
8bee809417 Merge branch 'wakonig_k-main-patch-37578' into 'main'
ci: removed needs for additional tests

See merge request bec/debye-bec!10
2024-04-24 21:54:47 +02:00
f84bc177e2 Update .gitlab-ci.yml 2024-04-24 18:25:15 +02:00
67b65f47fd ci: removed needs for additional tests 2024-04-24 18:22:41 +02:00
43d3909e69 Merge branch 'fix/config_update' into 'main'
fix: readoutPriority for config

See merge request bec/debye-bec!9
2024-04-24 18:21:14 +02:00
a7c7569f72 fix: readoutPriority for config 2024-04-24 18:15:18 +02:00
dea30bd41d feat: added x01da_database from internal git 2024-04-24 18:11:03 +02:00
568c1b413a Merge branch 'refactor/plugin_structure' into 'main'
refactor: upgraded to new plugin structure

See merge request bec/debye-bec!8
2024-04-24 11:32:42 +02:00
71ebbd86be refactor: upgraded to new plugin structure 2024-04-24 11:15:55 +02:00
50 changed files with 2982 additions and 442 deletions

3
.git_hooks/post-commit Normal file
View File

@@ -0,0 +1,3 @@
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
semantic-release changelog -D version_variable=$SCRIPT_DIR/../../semantic_release/__init__.py:__version__
semantic-release version -D version_variable=$SCRIPT_DIR/../../semantic_release/__init__.py:__version__

3
.git_hooks/pre-commit Normal file
View File

@@ -0,0 +1,3 @@
black --line-length=100 $(git diff --cached --name-only --diff-filter=ACM -- '*.py')
isort --line-length=100 --profile=black --multi-line=3 --trailing-comma $(git diff --cached --name-only --diff-filter=ACM -- '*.py')
git add $(git diff --cached --name-only --diff-filter=ACM -- '*.py')

3
.gitignore vendored
View File

@@ -8,6 +8,9 @@
**/.pytest_cache
**/*.egg*
# recovery_config files
recovery_config_*
# file writer data
**.h5

View File

@@ -1,40 +1,7 @@
# This file is a template, and might need editing before it works on your project.
# Official language image. Look for the different tagged releases at:
# https://hub.docker.com/r/library/python/tags/
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.10
#commands to run in the Docker container before starting each job.
before_script:
- pip install -e .[dev]
# different stages in the pipeline
stages:
- Formatter
- Test
- AdditionalTests
- Deploy
formatter:
stage: Formatter
script:
- pip install black
- black --check --diff --color --line-length=100 ./
pytest:
stage: Test
script:
- pytest -v --random-order ./tests
tests-3.11:
stage: AdditionalTests
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.11
needs: ["pytest"]
script:
- pytest -v --random-order ./tests
allow_failure: true
tests-3.12:
extends: "tests-3.11"
stage: AdditionalTests
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.12
allow_failure: true
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "debye"
target: "debye_bec"
branch: $CHILD_PIPELINE_BRANCH

View File

@@ -1 +0,0 @@
from .bec_client import *

View File

@@ -1 +0,0 @@
from .plugins import *

View File

@@ -1,245 +0,0 @@
from bec_lib.devicemanager import Device
from bec_lib.scan_report import ScanReport
# pylint:disable=undefined-variable
# pylint: disable=too-many-arguments
def dscan(
motor1: Device, m1_from: float, m1_to: float, steps: int, exp_time: float, **kwargs
) -> ScanReport:
"""Relative line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position relative to the current position.
m1_to (float): End position relative to the current position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> dscan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=True, **kwargs
)
def d2scan(
motor1: Device,
m1_from: float,
m1_to: float,
motor2: Device,
m2_from: float,
m2_to: float,
steps: int,
exp_time: float,
**kwargs
) -> ScanReport:
"""Relative line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> d2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=True,
**kwargs
)
def ascan(motor1, m1_from, m1_to, steps, exp_time, **kwargs):
"""Absolute line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position.
m1_to (float): End position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> ascan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=False, **kwargs
)
def a2scan(motor1, m1_from, m1_to, motor2, m2_from, m2_to, steps, exp_time, **kwargs):
"""Absolute line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> a2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=False,
**kwargs
)
def dmesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Relative mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> dmesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=True,
)
def amesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Absolute mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> amesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=False,
)
def umv(*args) -> ScanReport:
"""Updated absolute move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umv(dev.samx, 1)
>>> umv(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=False)
def umvr(*args) -> ScanReport:
"""Updated relative move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umvr(dev.samx, 1)
>>> umvr(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=True)
def mv(*args) -> ScanReport:
"""Absolute move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mv(dev.samx, 1)
>>> mv(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=False)
def mvr(*args) -> ScanReport:
"""Relative move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mvr(dev.samx, 1)
>>> mvr(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=True)

View File

@@ -1 +0,0 @@

View File

@@ -1,46 +0,0 @@
"""
Post startup script for the BEC client. This script is executed after the
IPython shell is started. It is used to load the beamline specific
information and to setup the prompts.
The script is executed in the global namespace of the IPython shell. This
means that all variables defined here are available in the shell.
If needed, bec command-line arguments can be parsed here. For example, to
parse the --session argument, add the following lines to the script:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--session", help="Session name", type=str, default="my_default_session")
args = parser.parse_args()
if args.session == "my_session":
print("Loading my_session session")
from bec_plugins.bec_client.plugins.my_session import *
else:
print("Loading default session")
from bec_plugins.bec_client.plugins.default_session import *
"""
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
import argparse
from bec_lib import bec_logger
logger = bec_logger.logger
logger.info("Using the Debye startup script.")
parser = argparse.ArgumentParser()
parser.add_argument("--session", help="Session name", type=str, default="Debye")
args = parser.parse_args()
# SETUP BEAMLINE INFO
from bec_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
bec._beamline_mixin._bl_info_register(SLSInfo)
bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS
bec._ip.prompts.username = "Debye"
bec._ip.prompts.status = 1

View File

@@ -1,25 +0,0 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to set up the BEC client configuration. The script is
executed in the global namespace of the BEC client. This means that all
variables defined here are available in the BEC client.
To set up the BEC client configuration, use the ServiceConfig class. For example,
to set the configuration file path, add the following lines to the script:
import pathlib
from bec_lib import ServiceConfig
current_path = pathlib.Path(__file__).parent.resolve()
CONFIG_PATH = f"{current_path}/<path_to_my_config_file.yaml>"
config = ServiceConfig(CONFIG_PATH)
If this startup script defined a ServiceConfig object, the BEC client will use
it to configure itself. Otherwise, the BEC client will use the default config.
"""
# example:
# current_path = pathlib.Path(__file__).parent.resolve()
# CONFIG_PATH = f"{current_path}/../../../bec_config.yaml"
# config = ServiceConfig(CONFIG_PATH)

View File

@@ -0,0 +1,36 @@
"""
Post startup script for the BEC client. This script is executed after the
IPython shell is started. It is used to load the beamline specific
information and to setup the prompts.
The script is executed in the global namespace of the IPython shell. This
means that all variables defined here are available in the shell.
While command-line arguments have to be set in the pre-startup script, the
post-startup script can be used to load beamline specific information and
to setup the prompts.
from bec_lib import bec_logger
logger = bec_logger.logger
# pylint: disable=import-error
_args = _main_dict["args"]
_session_name = "cSAXS"
if _args.session.lower() == "lamni":
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
from csaxs_bec.bec_ipython_client.plugins.LamNI import *
_session_name = "LamNI"
lamni = LamNI(bec)
logger.success("LamNI session loaded.")
elif _args.session.lower() == "csaxs":
print("Loading cSAXS session")
from csaxs_bec.bec_ipython_client.plugins.cSAXS import *
logger.success("cSAXS session loaded.")
"""
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module

View File

@@ -0,0 +1,23 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
"""
from bec_lib.service_config import ServiceConfig
def extend_command_line_args(parser):
"""
Extend the command line arguments of the BEC client.
"""
# parser.add_argument("--session", help="Session name", type=str, default="cSAXS")
return parser
def get_config() -> ServiceConfig:
"""
Create and return the service configuration.
"""
return ServiceConfig(redis={"host": "x01da-bec-001", "port": 6379})

View File

View File

View File

@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
# os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
# os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

View File

@@ -0,0 +1,727 @@
###################
#### FRONT END ####
###################
## Slit Diaphragm -- Physical positioners
sldi_trxr:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
sldi_trxw:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryb:
readoutPriority: baseline
description: Front-end slit diaphragm Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
sldi_tryt:
readoutPriority: baseline
description: Front-end slit diaphragm X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
## Slit Diaphragm -- Virtual positioners
sldi_centerx:
readoutPriority: baseline
description: Front-end slit diaphragm X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapx:
readoutPriority: baseline
description: Front-end slit diaphragm X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
sldi_centery:
readoutPriority: baseline
description: Front-end slit diaphragm Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
sldi_gapy:
readoutPriority: baseline
description: Front-end slit diaphragm Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-SLDI:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
## Collimating Mirror -- Physical Positioners
cm_trxu:
readoutPriority: baseline
description: Collimating Mirror X-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRXU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trxd:
readoutPriority: baseline
description: Collimating Mirror X-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRXD
onFailure: retry
enabled: true
softwareTrigger: false
cm_tryu:
readoutPriority: baseline
description: Collimating Mirror Y-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRYU
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydr:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream ring
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRYDR
onFailure: retry
enabled: true
softwareTrigger: false
cm_trydw:
readoutPriority: baseline
description: Collimating Mirror Y-translation downstream wall
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:TRYDW
onFailure: retry
enabled: true
softwareTrigger: false
cm_bnd:
readoutPriority: baseline
description: Collimating Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:BND
onFailure: retry
enabled: true
softwareTrigger: false
## Collimating Mirror -- Virtual Positioners
cm_rotx:
readoutPriority: baseline
description: Collimating Morror Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
cm_roty:
readoutPriority: baseline
description: Collimating Morror Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
cm_rotz:
readoutPriority: baseline
description: Collimating Morror Roll
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ROTZ
onFailure: retry
enabled: true
softwareTrigger: false
cm_xctp:
readoutPriority: baseline
description: Collimating Morror Center Point X
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:XTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_ytcp:
readoutPriority: baseline
description: Collimating Morror Center Point Y
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:YTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_ztcp:
readoutPriority: baseline
description: Collimating Morror Center Point Z
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:ZTCP
onFailure: retry
enabled: true
softwareTrigger: false
cm_xstripe:
readoutPriority: baseline
description: Collimating Morror X Stripe
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-FE-CM:XSTRIPE
onFailure: retry
enabled: true
softwareTrigger: false
###################
###### OPTICS #####
###################
## Bragg Monochromator
mo1_bragg:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: debye_bec.devices.mo1_bragg.Mo1Bragg
deviceConfig:
prefix: "X01DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
## Monochromator -- Physical Positioners
mo_try:
readoutPriority: baseline
description: Monochromator Y Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-MO1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
mo_trx:
readoutPriority: baseline
description: Monochromator X Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-MO1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
mo_roty:
readoutPriority: baseline
description: Monochromator Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-MO1:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
## Focusing Mirror -- Physical Positioners
fm_trxu:
readoutPriority: baseline
description: Focusing Mirror X-translation upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRXU
onFailure: retry
enabled: true
softwareTrigger: false
fm_trxd:
readoutPriority: baseline
description: Focusing Mirror X-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRXD
onFailure: retry
enabled: true
softwareTrigger: false
fm_tryd:
readoutPriority: baseline
description: Focusing Mirror Y-translation downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRYD
onFailure: retry
enabled: true
softwareTrigger: false
fm_tryur:
readoutPriority: baseline
description: Focusing Mirror Y-translation upstream ring
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRYUR
onFailure: retry
enabled: true
softwareTrigger: false
fm_tryuw:
readoutPriority: baseline
description: Focusing Mirror Y-translation upstream wall
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:TRYUW
onFailure: retry
enabled: true
softwareTrigger: false
fm_bnd:
readoutPriority: baseline
description: Focusing Mirror bender
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:BND
onFailure: retry
enabled: true
softwareTrigger: false
## Focusing Mirror -- Virtual Positioners
fm_rotx:
readoutPriority: baseline
description: Focusing Morror Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
fm_roty:
readoutPriority: baseline
description: Focusing Morror Yaw
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ROTY
onFailure: retry
enabled: true
softwareTrigger: false
fm_rotz:
readoutPriority: baseline
description: Focusing Morror Roll
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ROTZ
onFailure: retry
enabled: true
softwareTrigger: false
fm_xctp:
readoutPriority: baseline
description: Focusing Morror Center Point X
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:XTCP
onFailure: retry
enabled: true
softwareTrigger: false
fm_ytcp:
readoutPriority: baseline
description: Focusing Morror Center Point Y
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:YTCP
onFailure: retry
enabled: true
softwareTrigger: false
fm_ztcp:
readoutPriority: baseline
description: Focusing Morror Center Point Z
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:ZTCP
onFailure: retry
enabled: true
softwareTrigger: false
fm_xstripe:
readoutPriority: baseline
description: Focusing Morror X Stripe
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-FM:XSTRIPE
onFailure: retry
enabled: true
softwareTrigger: false
## Optics Slits 1 -- Physical positioners
sl1_trxr:
readoutPriority: baseline
description: Optics slits 1 X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
sl1_trxw:
readoutPriority: baseline
description: Optics slits 1 X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
sl1_tryb:
readoutPriority: baseline
description: Optics slits 1 Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
sl1_tryt:
readoutPriority: baseline
description: Optics slits 1 X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
bm1_try:
readoutPriority: baseline
description: Beam Monitor 1 Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-BM1:TRY
onFailure: retry
enabled: true
softwareTrigger: false
## Optics Slits 1 -- Virtual positioners
sl1_centerx:
readoutPriority: baseline
description: Optics slits 1 X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
sl1_gapx:
readoutPriority: baseline
description: Optics slits 1 X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
sl1_centery:
readoutPriority: baseline
description: Optics slits 1 Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
sl1_gapy:
readoutPriority: baseline
description: Optics slits 1 Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL1:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
## Optics Slits 2 -- Physical positioners
sl2_trxr:
readoutPriority: baseline
description: Optics slits 2 X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
sl2_trxw:
readoutPriority: baseline
description: Optics slits 2 X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
sl2_tryb:
readoutPriority: baseline
description: Optics slits 2 Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
sl2_tryt:
readoutPriority: baseline
description: Optics slits 2 X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
bm2_try:
readoutPriority: baseline
description: Beam Monitor 2 Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-BM2:TRY
onFailure: retry
enabled: true
softwareTrigger: false
## Optics Slits 2 -- Virtual positioners
sl2_centerx:
readoutPriority: baseline
description: Optics slits 2 X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
sl2_gapx:
readoutPriority: baseline
description: Optics slits 2 X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
sl2_centery:
readoutPriority: baseline
description: Optics slits 2 Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
sl2_gapy:
readoutPriority: baseline
description: Optics slits 2 Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-OP-SL2:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
###############################
###### EXPERIMENTAL HUTCH #####
###############################
###########################################
## Optical Table -- Physical Positioners ##
###########################################
ot_tryu:
readoutPriority: baseline
description: Optical Table Y-Translation Upstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES-OT:TRYU
onFailure: retry
enabled: true
softwareTrigger: false
ot_tryd:
readoutPriority: baseline
description: Optical Table Y-Translation Downstream
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES-OT:TRYD
onFailure: retry
enabled: true
softwareTrigger: false
############################################
## Optical Table -- Virtual Positioners ###
############################################
ot_try:
readoutPriority: baseline
description: Optical Table Y-Translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES-OT:TRY
onFailure: retry
enabled: true
softwareTrigger: false
ot_pitch:
readoutPriority: baseline
description: Optical Table Pitch
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES-OT:ROTX
onFailure: retry
enabled: true
softwareTrigger: false
###############################################
## End Station Slits -- Physical Positioners ##
###############################################
es0sl_trxr:
readoutPriority: baseline
description: End Station slits X-translation Ring-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:TRXR
onFailure: retry
enabled: true
softwareTrigger: false
es0sl_trxw:
readoutPriority: baseline
description: End Station slits X-translation Wall-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:TRXW
onFailure: retry
enabled: true
softwareTrigger: false
es0sl_tryb:
readoutPriority: baseline
description: End Station slits Y-translation Bottom-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:TRYB
onFailure: retry
enabled: true
softwareTrigger: false
es0sl_tryt:
readoutPriority: baseline
description: End Station slits X-translation Top-edge
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:TRYT
onFailure: retry
enabled: true
softwareTrigger: false
##############################################
## End Station Slits -- Virtual positioners ##
##############################################
es0sl_center:
readoutPriority: baseline
description: End Station slits X-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:CENTERX
onFailure: retry
enabled: true
softwareTrigger: false
es0sl_gapx:
readoutPriority: baseline
description: End Station slits X-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:GAPX
onFailure: retry
enabled: true
softwareTrigger: false
es0sl_centery:
readoutPriority: baseline
description: End Station slits Y-center
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:CENTERY
onFailure: retry
enabled: true
softwareTrigger: false
es0sl_gapy:
readoutPriority: baseline
description: End Station slits Y-gap
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-SL:GAPY
onFailure: retry
enabled: true
softwareTrigger: false
#########################################
## Exit Window -- Physical Positioners ##
#########################################
es0wi_try:
readoutPriority: baseline
description: End Station 0 Exit Window Y-translation
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X01DA-ES0-WI:TRY
onFailure: retry
enabled: true
softwareTrigger: false
##########################
## AreaDetector Devices ##
##########################
#op_bpm1:
# readoutPriority: baseline
# description: Optics Hutch Beam Position Monitor 1
# deviceClass: ophyd.AreaDetector
# deviceConfig:
# prefix: X01DA-OP-GIGE01:
# onFailure: retry
# enabled: true
# softwareTrigger: false
#op_bpm2:
# readoutPriority: baseline
# description: Optics Hutch Beam Position Monitor 2
# deviceClass: ophyd.AreaDetector
# deviceConfig:
# prefix: X01DA-OP-GIGE02:
# onFailure: retry
# enabled: true
# softwareTrigger: false
#es_xryeye:
# readoutPriority: baseline
# description: X-Ray Eye
# deviceClass: ophyd.AreaDetector
# deviceConfig:
# prefix: X01DA-:
# onFailure: retry
# enabled: true
# softwareTrigger: false

View File

@@ -0,0 +1,19 @@
## Bragg Monochromator
mo1_bragg:
readoutPriority: baseline
description: Positioner for the Monochromator
deviceClass: debye_bec.devices.mo1_bragg.Mo1Bragg
deviceConfig:
prefix: "X01DA-OP-MO1:BRAGG:"
onFailure: retry
enabled: true
softwareTrigger: false
dummy_pv:
readoutPriority: monitored
description: Heartbeat of Bragg
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
read_pv: "X01DA-OP-MO1:BRAGG:heartbeat_RBV"
onFailure: retry
enabled: true
softwareTrigger: false

View File

View File

@@ -0,0 +1,11 @@
// This file was autogenerated. Do not edit it manually.
## Device List
### debye_bec
| Device | Documentation | Module |
| :----- | :------------- | :------ |
| Mo1Bragg | Class for the Mo1 Bragg positioner of the Debye beamline.<br> The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: which is connected to<br> the NI motor controller via web sockets.<br> | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggCrystal | Mo1 Bragg PVs to set the crystal parameters | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggEncoder | Mo1 Bragg PVs to communicate with the encoder | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggScanControl | Mo1 Bragg PVs to control the scan after setting the parameters. | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggScanSettings | Mo1 Bragg PVs to set the scan setttings | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |
| Mo1BraggStatus | Mo1 Bragg PVs for status monitoring | [debye_bec.devices.mo1_bragg](https://gitlab.psi.ch/bec/debye_bec/-/blob/main/debye_bec/devices/mo1_bragg.py) |

View File

@@ -0,0 +1,891 @@
""" Module for the Mo1 Bragg positioner of the Debye beamline.
The soft IOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected
to a motor controller via web sockets. The Mo1 Bragg positioner is not only a
positioner, but also a scan controller to setup XAS and XRD scans. A few scan modes
are programmed in the controller, e.g. simple and advanced XAS scans + XRD triggering mode.
Note: For some of the Epics PVs, in particular action buttons, the suffix .PROC and
put_complete=True is used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import enum
import threading
import time
import traceback
from dataclasses import dataclass
from typing import Literal
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import (
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
EpicsSignalWithRBV,
Kind,
PositionerBase,
Signal,
Staged,
)
from ophyd.utils import LimitError
from ophyd_devices.utils import bec_scaninfo_mixin, bec_utils
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
logger = bec_logger.logger
class ScanControlScanStatus(int, enum.Enum):
"""Enum class for the scan status of the Bragg positioner"""
PARAMETER_WRONG = 0
VALIDATION_PENDING = 1
READY = 2
RUNNING = 3
class ScanControlLoadMessage(int, enum.Enum):
"""Enum for validating messages for load message of the Bragg positioner"""
PENDING = 0
STARTED = 1
SUCCESS = 2
ERR_XRD_MEAS_LEN_LOW = 3
ERR_XRD_N_TRIGGERS_LOW = 4
ERR_XRD_TRIGS_EVERY_N_LOW = 5
ERR_XRD_MEAS_LEN_HI = 6
ERR_XRD_N_TRIGGERS_HI = 7
ERR_XRD_TRIGS_EVERY_N_HI = 8
ERR_SCAN_HI_ANGLE_LIMIT = 9
ERR_SCAN_LOW_ANGLE_LIMITS = 10
ERR_SCAN_TIME = 11
ERR_SCAN_VEL_TOO_HI = 12
ERR_SCAN_ANGLE_OUT_OF_LIM = 13
ERR_SCAN_HIGH_VEL_LAR_42 = 14
ERR_SCAN_MODE_INVALID = 15
class Mo1BraggError(Exception):
"""Mo1Bragg specific exception"""
class MoveType(str, enum.Enum):
"""Enum class to switch between move types energy and angle for the Bragg positioner"""
ENERGY = "energy"
ANGLE = "angle"
class ScanControlMode(int, enum.Enum):
"""Enum class for the scan control mode of the Bragg positioner"""
SIMPLE = 0
ADVANCED = 1
class MoveTypeSignal(Signal):
"""Custom Signal to set the move type of the Bragg positioner"""
# pylint: disable=arguments-differ
def set(self, value: str | MoveType) -> None:
"""Returns currently active move method
Args:
value (str | MoveType) : Can be either 'energy' or 'angle'
"""
value = MoveType(value.lower())
self._readback = value.value
class Mo1BraggStatus(Device):
"""Mo1 Bragg PVs for status monitoring"""
error_status = Cpt(EpicsSignalRO, suffix="error_status_RBV", kind="config", auto_monitor=True)
brake_enabled = Cpt(EpicsSignalRO, suffix="brake_enabled_RBV", kind="config", auto_monitor=True)
mot_commutated = Cpt(
EpicsSignalRO, suffix="mot_commutated_RBV", kind="config", auto_monitor=True
)
axis_enabled = Cpt(EpicsSignalRO, suffix="axis_enabled_RBV", kind="config", auto_monitor=True)
enc_initialized = Cpt(
EpicsSignalRO, suffix="enc_initialized_RBV", kind="config", auto_monitor=True
)
heartbeat = Cpt(EpicsSignalRO, suffix="heartbeat_RBV", kind="config", auto_monitor=True)
class Mo1BraggEncoder(Device):
"""Mo1 Bragg PVs to communicate with the encoder"""
enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit.PROC", kind="config")
enc_reinit_done = Cpt(EpicsSignalRO, suffix="enc_reinit_done_RBV", kind="config")
class Mo1BraggCrystal(Device):
"""Mo1 Bragg PVs to set the crystal parameters"""
offset_si111 = Cpt(EpicsSignalWithRBV, suffix="offset_si111", kind="config")
offset_si311 = Cpt(EpicsSignalWithRBV, suffix="offset_si311", kind="config")
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config")
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
set_offset = Cpt(EpicsSignal, suffix="set_offset.PROC", kind="config", put_complete=True)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
)
class Mo1BraggScanSettings(Device):
"""Mo1 Bragg PVs to set the scan setttings"""
# XRD settings
xrd_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="config")
xrd_enable_hi_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind="config")
xrd_time_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_time_hi", kind="config")
xrd_n_trigger_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind="config")
xrd_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind="config")
xrd_enable_lo_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind="config")
xrd_time_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_time_lo", kind="config")
xrd_n_trigger_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind="config")
xrd_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind="config")
# XAS simple scan settings
s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config")
s_scan_angle_lo = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="config")
s_scan_energy_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="config", auto_monitor=True
)
s_scan_energy_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="config", auto_monitor=True
)
s_scan_scantime = Cpt(
EpicsSignalWithRBV, suffix="s_scan_scantime", kind="config", auto_monitor=True
)
# XAS advanced scan settings
a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="config", auto_monitor=True)
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True)
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
class Mo1BraggScanControl(Device):
"""Mo1 Bragg PVs to control the scan after setting the parameters."""
scan_mode_enum = Cpt(EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="config")
scan_duration = Cpt(
EpicsSignalWithRBV, suffix="scan_duration", kind="config", auto_monitor=True
)
scan_load = Cpt(EpicsSignal, suffix="scan_load.PROC", kind="config", put_complete=True)
scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="config", auto_monitor=True)
scan_start_infinite = Cpt(
EpicsSignal, suffix="scan_start_infinite.PROC", kind="config", put_complete=True
)
scan_start_timer = Cpt(
EpicsSignal, suffix="scan_start_timer.PROC", kind="config", put_complete=True
)
scan_stop = Cpt(EpicsSignal, suffix="scan_stop.PROC", kind="config", put_complete=True)
scan_status = Cpt(
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True
)
scan_time_left = Cpt(
EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True)
scan_val_reset = Cpt(
EpicsSignal, suffix="scan_val_reset.PROC", kind="config", put_complete=True
)
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True)
scan_spectra_done = Cpt(
EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True
)
scan_spectra_left = Cpt(
EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="config", auto_monitor=True
)
@dataclass
class ScanParameter:
"""Dataclass to store the scan parameters for the Mo1 Bragg positioner.
This needs to be in sync with the kwargs of the MO1 Bragg scans from Debye, to
ensure that the scan parameters are correctly set. Any changes in the scan kwargs,
i.e. renaming or adding new parameters, need to be represented here as well."""
scan_time: float = None
scan_duration: float = None
xrd_enable_low: bool = None
xrd_enable_high: bool = None
num_trigger_low: int = None
num_trigger_high: int = None
exp_time_low: float = None
exp_time_high: float = None
cycle_low: int = None
cycle_high: int = None
start: float = None
stop: float = None
class Mo1Bragg(Device, PositionerBase):
"""Class for the Mo1 Bragg positioner of the Debye beamline.
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: which is connected to
the NI motor controller via web sockets.
"""
USER_ACCESS = ["set_xtal", "stop_scan"]
crystal = Cpt(Mo1BraggCrystal, "")
encoder = Cpt(Mo1BraggEncoder, "")
scan_settings = Cpt(Mo1BraggScanSettings, "")
scan_control = Cpt(Mo1BraggScanControl, "")
status = Cpt(Mo1BraggStatus, "")
# signal to indicate the move type 'energy' or 'angle'
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config")
# Energy PVs
readback = Cpt(
EpicsSignalRO, suffix="feedback_pos_energy_RBV", kind="hinted", auto_monitor=True
)
setpoint = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind="normal", auto_monitor=True
)
motor_is_moving = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
# Angle PVs
# TODO makd angle motion a pseudo motor
feedback_pos_angle = Cpt(
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True
)
setpoint_abs_angle = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True
)
low_limit_angle = Cpt(
EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True
)
high_limit_angle = Cpt(
EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True
)
# Execute motion
move_abs = Cpt(EpicsSignal, suffix="move_abs.PROC", kind="config", put_complete=True)
move_stop = Cpt(EpicsSignal, suffix="move_stop.PROC", kind="config", put_complete=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
SUB_PROGRESS = "progress"
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
):
"""Initialize the Mo1 Bragg positioner.
Args:
prefix (str): EPICS prefix for the device
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (DeviceManager): Device manager instance
parent (Device): Parent device
kwargs: Additional keyword arguments
"""
super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs)
self._stopped = False
self.device_manager = device_manager
self._move_thread = None
self.service_cfg = None
self.scaninfo = None
# Init scan parameters
self.scan_parameter = ScanParameter()
self.timeout_for_pvwait = 2.5
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
self._update_scaninfo()
self._on_init()
def _on_init(self):
"""Action to be executed on initialization of the device"""
self.scan_control.scan_progress.subscribe(self._progress_update, run=False)
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
max_value = 100
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
max_value=max_value,
done=bool(max_value == value),
)
def _update_scaninfo(self) -> None:
"""Connect to the ScanInfo mixin"""
self.scaninfo = bec_scaninfo_mixin.BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
@property
def stopped(self) -> bool:
"""Return the stopped flag. If True, the motion is stopped."""
return self._stopped
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner
Args:
success (bool) : Flag to indicate if the motion was successful
"""
self.move_stop.put(1)
self._stopped = True
if self._move_thread is not None:
self._move_thread.join()
self._move_thread = None
super().stop(success=success)
def stop_scan(self) -> None:
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
self.scan_control.scan_stop.put(1)
# -------------- Positioner specific methods -----------------#
@property
def limits(self) -> tuple:
"""Return limits of the Bragg positioner"""
if self.move_type.get() == MoveType.ENERGY:
return (self.low_lim.get(), self.high_lim.get())
return (self.low_limit_angle.get(), self.high_limit_angle.get())
@property
def low_limit(self) -> float:
"""Return low limit of axis"""
return self.limits[0]
@property
def high_limit(self) -> float:
"""Return high limit of axis"""
return self.limits[1]
@property
def egu(self) -> str:
"""Return the engineering units of the positioner"""
if self.move_type.get() == MoveType.ENERGY:
return "eV"
return "deg"
@property
def position(self) -> float:
"""Return the current position of Mo1Bragg, considering the move type"""
move_type = self.move_type.get()
move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle
return move_cpt.get()
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
"""Method to check if a value is within limits of the positioner. Called by PositionerBase.move()
Args:
value (float) : value to move axis to.
"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not low_limit <= value <= high_limit:
raise LimitError(f"position={value} not within limits {self.limits}")
def _move_and_finish(
self,
target_pos: float,
move_cpt: Cpt,
read_cpt: Cpt,
status: DeviceStatus,
update_frequency: float = 0.1,
) -> None:
"""Method to be called in the move thread to move the Bragg positioner to the target position.
Args:
target_pos (float) : target position for the motion
move_cpt (Cpt) : component to set the target position on the IOC,
either setpoint or setpoint_abs_angle depending on the move type
read_cpt (Cpt) : component to read the current position of the motion, readback or feedback_pos_angle
status (DeviceStatus) : status object to set the status of the motion
update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s
"""
try:
# Set the target position on IOC
move_cpt.put(target_pos)
self.move_abs.put(1)
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
while self.motor_is_moving.get() == 0:
if self.stopped:
raise DeviceStopError(f"Device {self.name} was stopped")
time.sleep(update_frequency)
# pylint: disable=protected-access
status.set_finished()
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Error in move thread of device {self.name}: {content}")
status.set_exception(exc=exc)
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
"""Move the Bragg positioner to the specified value, allows to switch between move types angle and energy.
Args:
value (float) : target value for the motion
move_type (str | MoveType) : Optional, specify the type of move, either 'energy' or 'angle'
Returns:
DeviceStatus : status object to track the motion
"""
self._stopped = False
if move_type is not None:
self.move_type.put(move_type)
move_type = self.move_type.get()
move_cpt = self.setpoint if move_type == MoveType.ENERGY else self.setpoint_abs_angle
read_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle
self.check_value(value)
status = DeviceStatus(device=self)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, move_cpt, read_cpt, status, 0.1)
)
self._move_thread.start()
return status
# -------------- End of Positioner specific methods -----------------#
# -------------- MO1 Bragg specific methods -----------------#
def set_xtal(
self,
xtal_enum: Literal["111", "311"],
offset_si111: float = None,
offset_si311: float = None,
d_spacing_si111: float = None,
d_spacing_si311: float = None,
) -> None:
"""Method to set the crystal parameters of the Bragg positioner
Args:
xtal_enum (Literal["111", "311"]) : Enum to set the crystal orientation
offset_si111 (float) : Offset for the 111 crystal
offset_si311 (float) : Offset for the 311 crystal
d_spacing_si111 (float) : d-spacing for the 111 crystal
d_spacing_si311 (float) : d-spacing for the 311 crystal
"""
if offset_si111 is not None:
self.crystal.offset_si111.put(offset_si111)
if offset_si311 is not None:
self.crystal.offset_si311.put(offset_si311)
if d_spacing_si111 is not None:
self.crystal.d_spacing_si111.put(d_spacing_si111)
if d_spacing_si311 is not None:
self.crystal.d_spacing_si311.put(d_spacing_si311)
if xtal_enum == "111":
crystal_set = 0
elif xtal_enum == "311":
crystal_set = 1
else:
raise ValueError(
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
)
self.crystal.xtal_enum.put(crystal_set)
self.crystal.set_offset.put(1)
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
"""Set XAS parameters for upcoming scan.
Args:
low (float): Low energy/angle value of the scan
high (float): High energy/angle value of the scan
scan_time (float): Time for a half oscillation
"""
move_type = self.move_type.get()
if move_type == MoveType.ENERGY:
self.scan_settings.s_scan_energy_lo.put(low)
self.scan_settings.s_scan_energy_hi.put(high)
else:
self.scan_settings.s_scan_angle_lo.put(low)
self.scan_settings.s_scan_angle_hi.put(high)
self.scan_settings.s_scan_scantime.put(scan_time)
def set_xrd_settings(
self,
enable_low: bool,
enable_high: bool,
num_trigger_low: int,
num_trigger_high: int,
exp_time_low: int,
exp_time_high: int,
cycle_low: int,
cycle_high: int,
) -> None:
"""Set XRD settings for the upcoming scan.
Args:
enable_low (bool): Enable XRD for low energy/angle
enable_high (bool): Enable XRD for high energy/angle
num_trigger_low (int): Number of triggers for low energy/angle
num_trigger_high (int): Number of triggers for high energy/angle
exp_time_low (int): Exposure time for low energy/angle
exp_time_high (int): Exposure time for high energy/angle
cycle_low (int): Cycle for low energy/angle
cycle_high (int): Cycle for high energy/angle
"""
self.scan_settings.xrd_enable_hi_enum.put(int(enable_high))
self.scan_settings.xrd_enable_lo_enum.put(int(enable_low))
self.scan_settings.xrd_n_trigger_hi.put(num_trigger_high)
self.scan_settings.xrd_n_trigger_lo.put(num_trigger_low)
self.scan_settings.xrd_time_hi.put(exp_time_high)
self.scan_settings.xrd_time_lo.put(exp_time_low)
self.scan_settings.xrd_every_n_hi.put(cycle_high)
self.scan_settings.xrd_every_n_lo.put(cycle_low)
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
Args:
mode (ScanControlMode): Mode for the scan, either simple or advanced
scan_duration (float): Duration of the scan
"""
val = ScanControlMode(mode).value
self.scan_control.scan_mode_enum.put(val)
self.scan_control.scan_duration.put(scan_duration)
def _update_scan_parameter(self):
"""Get the scaninfo parameters for the scan."""
for key, value in self.scaninfo.scan_msg.content["info"]["kwargs"].items():
if hasattr(self.scan_parameter, key):
setattr(self.scan_parameter, key, value)
# -------------- End MO1 Bragg specific methods -----------------#
# -------------- Flyer Interface methods -----------------#
def kickoff(self):
"""Kickoff the device, called from BEC."""
scan_duration = self.scan_control.scan_duration.get()
# TODO implement better logic for infinite scans, at least bring it up with Debye
start_func = (
self.scan_control.scan_start_infinite.put
if scan_duration < 0.1
else self.scan_control.scan_start_timer.put
)
start_func(1)
status = self.wait_with_status(
signal_conditions=[(self.scan_control.scan_status.get, ScanControlScanStatus.RUNNING)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
)
return status
def stage(self) -> list[object]:
"""
Stage the device in preparation for a scan.
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self._stopped = False
self.scaninfo.load_scan_metadata()
self.on_stage()
return super().stage()
def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None:
"""Check if the scan message is gettting available
Args:
target_state (ScanControlLoadMessage): Target state to check for
Raises:
TimeoutError: If the scan message is not available after the timeout
"""
state = self.scan_control.scan_msg.get()
if state != target_state:
logger.warning(
f"Resetting scan validation in stage for state: {ScanControlLoadMessage(state)}, "
f"retry .get() on scan_control: {ScanControlLoadMessage(self.scan_control.scan_msg.get())} and sleeping 1s"
)
self.scan_control.scan_val_reset.put(1)
# Sleep to ensure the reset is done
time.sleep(1)
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, target_state)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status,"
f" current state: {ScanControlScanStatus(self.scan_control.scan_msg.get())}"
)
def on_stage(self) -> None:
"""Actions to be executed when the device is staged."""
if not self.scaninfo.scan_type == "fly":
return
self._check_scan_msg(ScanControlLoadMessage.PENDING)
scan_name = self.scaninfo.scan_msg.content["info"].get("scan_name", "")
self._update_scan_parameter()
if scan_name == "xas_simple_scan":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_xrd_settings(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
elif scan_name == "xas_simple_scan_with_xrd":
self.set_xas_settings(
low=self.scan_parameter.start,
high=self.scan_parameter.stop,
scan_time=self.scan_parameter.scan_time,
)
self.set_xrd_settings(
enable_low=self.scan_parameter.xrd_enable_low,
enable_high=self.scan_parameter.xrd_enable_high,
num_trigger_low=self.scan_parameter.num_trigger_low,
num_trigger_high=self.scan_parameter.num_trigger_high,
exp_time_low=self.scan_parameter.exp_time_low,
exp_time_high=self.scan_parameter.exp_time_high,
cycle_low=self.scan_parameter.cycle_low,
cycle_high=self.scan_parameter.cycle_high,
)
self.set_scan_control_settings(
mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration
)
else:
raise Mo1BraggError(
f"Scan mode {scan_name} not implemented for scan_type={self.scaninfo.scan_type} on device {self.name}"
)
# Load the scan parameters to the controller
self.scan_control.scan_load.put(1)
# Wait for params to be checked from controller
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Scan parameter validation run into timeout after {self.timeout_for_pvwait} with {ScanControlLoadMessage(self.scan_control.scan_msg.get())}"
)
def complete(self) -> DeviceStatus:
"""Complete the acquisition.
The method returns a DeviceStatus object that resolves to set_finished or set_exception once the acquisition is completed.
"""
status = self.on_complete()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
return status
def on_complete(self) -> DeviceStatus:
"""Specify actions to be performed for the completion of the acquisition."""
status = self.wait_with_status(
signal_conditions=[(self.scan_control.scan_done.get, 1)],
timeout=None,
check_stopped=True,
)
return status
def unstage(self) -> list[object]:
"""
Unstage device after a scan. It has to be possible to call this multiple times.
Returns:
list(object): list of objects that were unstaged
"""
self.check_scan_id()
self._stopped = False
self.on_unstage()
return super().unstage()
def on_unstage(self) -> None:
"""Actions to be executed when the device is unstaged.
The checks here ensure that the controller resets the Scan_msg to PENDING state."""
if self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
timeout=self.timeout_for_pvwait,
check_stopped=False,
):
return
self.scan_control.scan_val_reset.put(1)
if not self.wait_for_signals(
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
timeout=self.timeout_for_pvwait,
check_stopped=False,
):
raise TimeoutError(
f"Timeout after {self.timeout_for_pvwait} while waiting for scan validation"
)
# -------------- End Flyer Interface methods -----------------#
# -------------- Utility methods -----------------#
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id:
self._stopped = True
def wait_for_signals(
self,
signal_conditions: list[tuple],
timeout: float | None = None,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""Wrapper around a list of conditions that allows waiting for them to become True.
For EPICs PVs, an example usage is pasted at the bottom.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked. The function relies on the self.stopped property to be set
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
Returns:
bool: True if all signals are in the desired state, False if timeout is reached
>>> Example usage for EPICS PVs:
>>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True)
"""
timer = 0
while True:
checks = [
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if check_stopped is True and self.stopped is True:
return False
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
if timeout and timer > timeout:
return False
time.sleep(interval)
timer += interval
def wait_with_status(
self,
signal_conditions: list[tuple],
timeout: float | None = None,
check_stopped: bool = False,
interval: float = 0.05,
all_signals: bool = False,
exception_on_timeout: Exception = None,
) -> DeviceStatus:
"""Wrapper around wait_for_signals to be started in thread and attach a DeviceStatus object.
This allows BEC to perform actinos in parallel and not be blocked by method calls on a device.
Typically used for on_trigger, on_complete methods or also the kickoff.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
Returns:
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
"""
if exception_on_timeout is None:
exception_on_timeout = DeviceTimeoutError(
f"Timeout error for {self.name} while waiting for signals {signal_conditions}"
)
status = DeviceStatus(device=self)
def wait_for_signals_wrapper(
status: DeviceStatus,
signal_conditions: list[tuple],
timeout: float,
check_stopped: bool,
interval: float,
all_signals: bool,
exception_on_timeout: Exception = None,
):
"""Convenient wrapper around wait_for_signals to set status based on the result.
Args:
status (DeviceStatus): DeviceStatus object to be set
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
"""
try:
result = self.wait_for_signals(
signal_conditions, timeout, check_stopped, interval, all_signals
)
if result is True:
# pylint: disable=protected-access
status.set_finished()
else:
if self.stopped:
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=DeviceStopError(f"{self.name} was stopped"))
else:
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=exception_on_timeout)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.warning(f"Error in wait_for_signals in {self.name}; Traceback: {content}")
# INFO This will execute a callback to the parent device.stop() method
status.set_exception(exc=exc)
thread = threading.Thread(
target=wait_for_signals_wrapper,
args=(
status,
signal_conditions,
timeout,
check_stopped,
interval,
all_signals,
exception_on_timeout,
),
daemon=True,
)
thread.start()
return status

View File

@@ -0,0 +1,9 @@
def patch_dual_pvs(device):
device.wait_for_connection(all_signals=True)
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv

View File

View File

@@ -0,0 +1 @@
from .mono_bragg_scans import XASSimpleScan, XASSimpleScanWithXRD

View File

@@ -0,0 +1,167 @@
""" This module contains the scan classes for the mono bragg motor of the Debye beamline."""
import time
import numpy as np
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import AsyncFlyScanBase
class XASSimpleScan(AsyncFlyScanBase):
scan_name = "xas_simple_scan"
scan_type = "fly"
scan_report_hint = "device_progress"
required_kwargs = []
use_scan_progress_report = False
pre_move = False
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration
is the duration of the scan. If scan duration is set to 0, the scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one scan cycle.
scan_duration (float): Duration of the scan.
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan(start=8000, stop=9000, scan_time=1, scan_duration=10)
"""
super().__init__(**kwargs)
self.motor = motor
self.start = start
self.stop = stop
self.scan_time = scan_time
self.scan_duration = scan_duration
self.primary_readout_cycle = 1
def prepare_positions(self):
"""Prepare the positions for the scan.
Use here only start and end energy defining the range for the scan.
"""
self.positions = np.array([self.start, self.stop], dtype=float)
self.num_pos = None
yield None
def pre_scan(self):
"""Pre Scan action. Ensure the motor movetype is set to energy, then check limits for start/end energy.
#TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor.
"""
yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy")
self._check_limits()
# Ensure parent class pre_scan actions to be called.
super().pre_scan()
def scan_report_instructions(self):
"""
Return the instructions for the scan report.
"""
yield from self.stubs.scan_report_instruction({"device_progress": [self.motor]})
def scan_core(self):
"""Run the scan core.
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor)
yield from self.stubs.complete(device=self.motor)
# Get the target DIID (instruction number) for the stubs.complete call
target_diid = self.DIID - 1
while True:
# Readout monitored devices
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
# Check if complete call on Mo1 Bragg has been finished
status = self.stubs.get_req_status(
device=self.motor, RID=self.metadata["RID"], DIID=target_diid
)
if status:
break
time.sleep(self.primary_readout_cycle)
self.point_id += 1
self.num_pos = self.point_id + 1
class XASSimpleScanWithXRD(XASSimpleScan):
scan_name = "xas_simple_scan_with_xrd"
gui_config = {
"Movement Parameters": ["start", "stop"],
"Scan Parameters": ["scan_time", "scan_duration"],
"Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"],
"High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"],
}
def __init__(
self,
start: float,
stop: float,
scan_time: float,
scan_duration: float,
xrd_enable_low: bool,
num_trigger_low: int,
exp_time_low: float,
cycle_low: int,
xrd_enable_high: bool,
num_trigger_high: int,
exp_time_high: float,
cycle_high: float,
motor: DeviceBase = "mo1_bragg",
**kwargs,
):
"""The xas_simple_scan_with_xrd is an oscillation motion on the mono motor
with XRD triggering at low and high energy ranges.
If scan duration is set to 0, the scan will run infinitely.
Args:
start (float): Start energy for the scan.
stop (float): Stop energy for the scan.
scan_time (float): Time for one oscillation .
scan_duration (float): Total duration of the scan.
xrd_enable_low (bool): Enable XRD triggering for the low energy range.
num_trigger_low (int): Number of triggers for the low energy range.
exp_time_low (float): Exposure time for the low energy range.
cycle_low (int): Specify how often the triggers should be considered, every nth cycle for low
xrd_enable_high (bool): Enable XRD triggering for the high energy range.
num_trigger_high (int): Number of triggers for the high energy range.
exp_time_high (float): Exposure time for the high energy range.
cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
super().__init__(
start=start,
stop=stop,
scan_time=scan_time,
scan_duration=scan_duration,
motor=motor,
**kwargs,
)
self.xrd_enable_low = xrd_enable_low
self.num_trigger_low = num_trigger_low
self.exp_time_low = exp_time_low
self.cycle_low = cycle_low
self.xrd_enable_high = xrd_enable_high
self.num_trigger_high = num_trigger_high
self.exp_time_high = exp_time_high
self.cycle_high = cycle_high

View File

@@ -0,0 +1,33 @@
"""
SCAN PLUGINS
All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:
- self.initialize # initialize the class if needed
- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
- self._calculate_positions # calculate the positions
- self._set_positions_offset # apply the previously retrieved scan position shift (if needed)
- self._check_limits # tests to ensure the limits won't be reached
- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage # stage all devices for the upcoming acquisiton
- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan
- self.pre_scan # perform additional actions before the scan starts
- self.scan_core # run a loop over all position
- self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments
- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage # unstage all devices that have been staged before
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
# import time
# import numpy as np
# from bec_lib import bec_logger, messages
# from bec_lib.endpoints import MessageEndpoints
# from bec_server.scan_server.errors import ScanAbortion
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
# logger = bec_logger.logger

View File

@@ -7,10 +7,12 @@ import traceback
import h5py
import numpy as np
from bec_lib import MessageEndpoints, RedisConnector, ServiceConfig, bec_logger, messages
from bec_lib import bec_logger, messages
from bec_lib.bec_service import BECService
from bec_lib.file_utils import FileWriterMixin
from bec_lib.redis_connector import MessageObject
from bec_lib.endpoints import MessageEndpoints
from bec_lib.file_utils import FileWriter
from bec_lib.redis_connector import MessageObject, RedisConnector
from bec_lib.service_config import ServiceConfig
logger = bec_logger.logger
@@ -27,7 +29,7 @@ class NIDAQWriterService(BECService):
super().__init__(config=config, connector_cls=connector_cls, unique_service=True)
self.queue = queue.Queue()
config = self._service_config.service_config.get("file_writer")
self.writer_mixin = FileWriterMixin(config)
self.writer_mixin = FileWriter(service_config=config)
self._scan_status_consumer = None
self._ni_data_consumer = None
self._ni_data_event = None
@@ -159,7 +161,7 @@ class NIDAQWriterService(BECService):
signals = {}
for key in msgs[0].content["signals"]:
signals[key] = np.concatenate([msg.content["signals"][key] for msg in msgs])
signals[key] = np.concatenate([msg.content["signals"][key]["value"] for msg in msgs])
# write data to queue
self.queue.put(signals)

View File

@@ -1,4 +1,6 @@
from bec_lib import MessageEndpoints, RedisConnector, messages
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
def send_scan_status(scan_number, status):
@@ -24,19 +26,9 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scan status helper")
command = parser.add_subparsers(dest="command")
start = command.add_parser("start", help="Start a new scan")
start.add_argument(
"--scan_number",
type=int,
required=True,
help="Scan number",
)
start.add_argument("--scan_number", type=int, required=True, help="Scan number")
stop = command.add_parser("stop", help="Stop the scan")
stop.add_argument(
"--scan_number",
type=int,
required=True,
help="Scan number",
)
stop.add_argument("--scan_number", type=int, required=True, help="Scan number")
args = parser.parse_args()
send_scan_status(args.scan_number, args.command)

View File

@@ -2,7 +2,8 @@ import threading
import time
import numpy as np
from bec_lib import MessageEndpoints, RedisConnector, ServiceConfig, bec_logger, messages
from bec_lib import messages
from bec_lib.redis_connector import RedisConnector
class NIDAQSim(threading.Thread):
@@ -13,10 +14,7 @@ class NIDAQSim(threading.Thread):
index = 0
producer = RedisConnector(["localhost:6379"]).producer()
signal = np.asarray(range(index, index + 600000))
signals = {
"signal1": signal,
"signal2": signal,
}
signals = {"signal1": signal, "signal2": signal}
msg = messages.DeviceMessage(signals=signals)
msg = msg.dumps()
@@ -40,9 +38,6 @@ class NIDAQSim(threading.Thread):
time.sleep(0.5)
print(f"Elapsed time: {time.time() - start}")
print(f"Total time: {time.time() - total_time}")
print(f"FPS: {index / (time.time() - total_time)}")
print(f"Signal size: {signal.nbytes/1e6*2} MB")
if __name__ == "__main__":

View File

View File

@@ -1,15 +1,14 @@
import argparse
import threading
from bec_lib import RedisConnector, ServiceConfig, bec_logger
from NIDAQ_writer import NIDAQWriterService
from bec_lib import bec_logger
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
from debye_bec.services.NIDAQ_writer import NIDAQWriterService
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"--config",
default="",
help="path to the config file",
)
parser.add_argument("--config", default="", help="path to the config file")
clargs = parser.parse_args()
config_path = clargs.config
@@ -17,10 +16,7 @@ config = ServiceConfig(config_path)
bec_logger.level = bec_logger.LOGLEVEL.INFO
logger = bec_logger.logger
bec_server = NIDAQWriterService(
config=config,
connector_cls=RedisConnector,
)
bec_server = NIDAQWriterService(config=config, connector_cls=RedisConnector)
try:
event = threading.Event()
# pylint: disable=E1102

70
pyproject.toml Normal file
View File

@@ -0,0 +1,70 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "debye_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = ["numpy", "bec_lib", "h5py", "ophyd_devices"]
[project.optional-dependencies]
dev = [
"bec_server",
"black",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
]
[project.entry-points."bec"]
plugin_bec = "debye_bec"
[project.entry-points."bec.deployment.device_server"]
plugin_ds_startup = "debye_bec.deployments.device_server.startup:run"
[project.entry-points."bec.file_writer"]
plugin_file_writer = "debye_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "debye_bec.scans"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "debye_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "debye_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets"]
plugin_widgets = "debye_bec.bec_widgets"
[tool.hatch.build.targets.wheel]
include = ["*"]
[tool.isort]
profile = "black"
line_length = 100
multi_line_output = 3
include_trailing_comma = true
[tool.black]
line-length = 100
skip-magic-trailing-comma = true
[tool.pylint.basic]
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs = [
".*scanID.*",
".*RID.*",
".*pointID.*",
".*ID.*",
".*_2D.*",
".*_1D.*",
]

View File

@@ -1,21 +0,0 @@
[metadata]
name = bec_plugins
description = BEC plugins to modify the behaviour of services within the BEC framework
long_description = file: README.md
long_description_content_type = text/markdown
url = https://gitlab.psi.ch/bec/bec
project_urls =
Bug Tracker = https://gitlab.psi.ch/bec/bec/issues
classifiers =
Programming Language :: Python :: 3
Development Status :: 3 - Alpha
Topic :: Scientific/Engineering
[options]
package_dir =
= .
packages = find:
python_requires = >=3.10
[options.packages.find]
where = .

View File

@@ -1,7 +0,0 @@
from setuptools import setup
if __name__ == "__main__":
setup(
install_requires=["numpy", "h5py", "bec-lib"],
extras_require={"dev": ["pytest", "pytest-random-order", "pytest-redis", "coverage"]},
)

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,436 @@
# pylint: skip-file
import os
import threading
import time
from dataclasses import fields
from unittest import mock
import ophyd
import pytest
from bec_lib.messages import ScanQueueMessage, ScanStatusMessage
from bec_server.scan_server.scan_assembler import ScanAssembler
from bec_server.scan_server.scan_queue import RequestBlock
from bec_server.scan_server.scan_worker import ScanWorker
from bec_server.scan_server.tests.fixtures import scan_server_mock
from ophyd.utils import LimitError
from ophyd_devices.tests.utils import MockPV
# from bec_server.device_server.tests.utils import DMMock
from debye_bec.devices.mo1_bragg import (
Mo1Bragg,
Mo1BraggError,
MoveType,
ScanControlLoadMessage,
ScanControlMode,
ScanControlScanStatus,
)
# TODO move this function to ophyd_devices, it is duplicated in csaxs_bec and needed for other pluging repositories
from debye_bec.devices.test_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def scan_worker_mock(scan_server_mock):
scan_server_mock.device_manager.connector = mock.MagicMock()
scan_worker = ScanWorker(parent=scan_server_mock)
yield scan_worker
@pytest.fixture(scope="function")
def mock_bragg():
name = "bragg"
prefix = "X01DA-OP-MO1:BRAGG:"
with (
mock.patch.object(ophyd, "cl") as mock_cl,
mock.patch("debye_bec.devices.mo1_bragg.Mo1Bragg", "_on_init"),
):
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = Mo1Bragg(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_bragg):
dev = mock_bragg
assert dev.name == "bragg"
assert dev.prefix == "X01DA-OP-MO1:BRAGG:"
assert dev.move_type.get() == MoveType.ENERGY
assert dev.crystal.offset_si111._read_pvname == "X01DA-OP-MO1:BRAGG:offset_si111_RBV"
assert dev.move_abs._read_pvname == "X01DA-OP-MO1:BRAGG:move_abs.PROC"
def test_check_value(mock_bragg):
dev = mock_bragg
dev.low_lim._read_pv.mock_data = 0
dev.high_lim._read_pv.mock_data = 1
dev.low_limit_angle._read_pv.mock_data = 10
dev.high_limit_angle._read_pv.mock_data = 20
# Check that limits are taken correctly from angle or energy
# Energy first
move_type = MoveType.ENERGY
dev.move_type.set(move_type)
# nothing happens
dev.check_value(0.5)
with pytest.raises(LimitError):
dev.check_value(15)
# Angle next
move_type = MoveType.ANGLE
dev.move_type.set(move_type)
dev.check_value(15)
with pytest.raises(LimitError):
dev.check_value(0.5)
def test_egu(mock_bragg):
dev = mock_bragg
assert dev.egu == "eV"
dev.move_type.set(MoveType.ANGLE)
assert dev.egu == "deg"
def test_move_succeeds(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
# Move succeeds
with mock.patch.object(dev.motor_is_moving._read_pv, "mock_data", side_effect=[0, 1]):
status = dev.move(0.5)
# Sleep needed to allow thread to resolive in _move_and_finish, i.e. and the 0.25s sleep inside the function
time.sleep(1)
assert status.done is True
assert status.success is True
assert dev.setpoint.get() == 0.5
assert dev.move_abs.get() == 1
def test_stop_move(mock_bragg):
dev = mock_bragg
dev.move_abs._read_pv.mock_data = 0
dev.motor_is_moving._read_pv.mock_data = 0
# Move fails
status = dev.move(0.5)
time.sleep(1)
assert status.done is False
assert dev._stopped == False
dev.stop()
time.sleep(0.5)
assert dev._stopped == True
assert status.done is True
assert status.success is False
def test_set_xtal(mock_bragg):
dev = mock_bragg
dev.set_xtal("111")
# Default values for mock
assert dev.crystal.offset_si111.get() == 0
assert dev.crystal.offset_si311.get() == 0
assert dev.crystal.d_spacing_si111.get() == 0
assert dev.crystal.d_spacing_si311.get() == 0
assert dev.crystal.xtal_enum.get() == 0
dev.set_xtal("311", offset_si111=1, offset_si311=2, d_spacing_si111=3, d_spacing_si311=4)
assert dev.crystal.offset_si111.get() == 1
assert dev.crystal.offset_si311.get() == 2
assert dev.crystal.d_spacing_si111.get() == 3
assert dev.crystal.d_spacing_si311.get() == 4
assert dev.crystal.xtal_enum.get() == 1
def test_set_xas_settings(mock_bragg):
dev = mock_bragg
dev.move_type.set(MoveType.ENERGY)
dev.set_xas_settings(low=0.5, high=1, scan_time=0.1)
assert dev.scan_settings.s_scan_energy_lo.get() == 0.5
assert dev.scan_settings.s_scan_energy_hi.get() == 1
assert dev.scan_settings.s_scan_scantime.get() == 0.1
dev.move_type.set(MoveType.ANGLE)
dev.set_xas_settings(low=10, high=20, scan_time=1)
assert dev.scan_settings.s_scan_angle_lo.get() == 10
assert dev.scan_settings.s_scan_angle_hi.get() == 20
assert dev.scan_settings.s_scan_scantime.get() == 1
def test_set_xrd_settings(mock_bragg):
dev = mock_bragg
dev.set_xrd_settings(
enable_low=True,
enable_high=False,
num_trigger_low=1,
num_trigger_high=7,
exp_time_low=1,
exp_time_high=3,
cycle_low=1,
cycle_high=5,
)
assert dev.scan_settings.xrd_enable_lo_enum.get() == True
assert dev.scan_settings.xrd_enable_hi_enum.get() == False
assert dev.scan_settings.xrd_n_trigger_lo.get() == 1
assert dev.scan_settings.xrd_n_trigger_hi.get() == 7
assert dev.scan_settings.xrd_time_lo.get() == 1
assert dev.scan_settings.xrd_time_hi.get() == 3
assert dev.scan_settings.xrd_every_n_lo.get() == 1
assert dev.scan_settings.xrd_every_n_hi.get() == 5
def test_set_control_settings(mock_bragg):
dev = mock_bragg
dev.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=10)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.SIMPLE
assert dev.scan_control.scan_duration.get() == 10
dev.set_scan_control_settings(mode=ScanControlMode.ADVANCED, scan_duration=5)
assert dev.scan_control.scan_mode_enum.get() == ScanControlMode.ADVANCED
assert dev.scan_control.scan_duration.get() == 5
def test_update_scan_parameters(mock_bragg):
dev = mock_bragg
msg = ScanStatusMessage(
scan_id="my_scan_id",
status="closed",
info={
"kwargs": {
"start": 0,
"stop": 5,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
}
},
metadata={},
)
mock_bragg.scaninfo.scan_msg = msg
for field in fields(dev.scan_parameter):
assert getattr(dev.scan_parameter, field.name) == None
dev._update_scan_parameter()
for field in fields(dev.scan_parameter):
assert getattr(dev.scan_parameter, field.name) == msg.content["info"]["kwargs"].get(
field.name, None
)
def test_kickoff_scan(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.READY
dev.scan_control.scan_duration._read_pv.mock_data = 1
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
assert status.done is False
dev.scan_control.scan_status._read_pv.mock_data = ScanControlScanStatus.RUNNING
time.sleep(0.2)
assert status.done is True
assert dev.scan_control.scan_start_timer.get() == 1
dev.scan_control.scan_duration._read_pv.mock_data = 0
dev.scan_control.scan_start_timer._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
dev.scan_control.scan_start_infinite._read_pv.mock_data = 0
status = dev.kickoff()
dev.kickoff()
assert dev.scan_control.scan_start_infinite.get() == 1
def test_complete(mock_bragg):
dev = mock_bragg
dev.scan_control.scan_done._read_pv.mock_data = 0
# Normal case
status = dev.complete()
assert status.done is False
assert status.success is False
dev.scan_control.scan_done._read_pv.mock_data = 1
time.sleep(0.2)
assert status.done is True
assert status.success is True
# Stop called case
dev.scan_control.scan_done._read_pv.mock_data = 0
status = dev.complete()
assert status.done is False
assert status.success is False
dev.stop()
time.sleep(0.2)
assert status.done is True
assert status.success is False
def test_unstage(mock_bragg):
mock_bragg.timeout_for_pvwait = 0.5
mock_bragg.scan_control.scan_val_reset._read_pv.mock_data = 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.PENDING
with mock.patch.object(mock_bragg.scan_control.scan_val_reset, "put") as mock_put:
mock_bragg.unstage()
assert mock_put.call_count == 0
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with pytest.raises(TimeoutError):
mock_bragg.unstage()
assert mock_put.call_count == 1
@pytest.mark.parametrize(
"msg",
[
ScanQueueMessage(
scan_type="monitor_scan",
parameter={
"args": {},
"kwargs": {"device": "mo1_bragg", "start": 0, "stop": 10, "relative": True},
"num_points": 100,
},
queue="primary",
metadata={"RID": "test1234"},
),
ScanQueueMessage(
scan_type="xas_simple_scan",
parameter={
"args": {},
"kwargs": {
"motor": "mo1_bragg",
"start": 0,
"stop": 10,
"scan_time": 1,
"scan_duration": 10,
},
"num_points": 100,
},
queue="primary",
metadata={"RID": "test1234"},
),
ScanQueueMessage(
scan_type="xas_simple_scan_with_xrd",
parameter={
"args": {},
"kwargs": {
"motor": "mo1_bragg",
"start": 0,
"stop": 10,
"scan_time": 1,
"scan_duration": 10,
"xrd_enable_low": True,
"xrd_enable_high": False,
"num_trigger_low": 1,
"num_trigger_high": 7,
"exp_time_low": 1,
"exp_time_high": 3,
"cycle_low": 1,
"cycle_high": 5,
},
"num_points": 10,
},
queue="primary",
metadata={"RID": "test1234"},
),
],
)
def test_stage(mock_bragg, scan_worker_mock, msg):
"""This test is important to check that the stage method of the device is working correctly.
Changing the kwargs names in the scans is tightly linked to the logic on the device, thus
it is important to check that the stage method is working correctly for the current implementation.
Therefor, this test creates a scaninfo message using the scan.open_scan() method to always check
agains the currently implemented scans vs. the logic on the device"""
# Create a scaninfo message using scans the ScanQueueMessages above, 3 cases of fly scan; for the general case the procedure is not defined yet
worker = scan_worker_mock
scan_server = worker.parent
rb = RequestBlock(msg, assembler=ScanAssembler(parent=scan_server))
with mock.patch.object(worker, "current_instruction_queue_item"):
worker.scan_motors = []
worker.readout_priority = {
"monitored": [],
"baseline": [],
"async": [],
"continuous": [],
"on_request": [],
}
open_scan_msg = list(rb.scan.open_scan())[0]
worker._initialize_scan_info(rb, open_scan_msg, msg.content["parameter"].get("num_points"))
scan_status_msg = ScanStatusMessage(
scan_id="test1234", status="closed", info=worker.current_scan_info, metadata={}
)
mock_bragg.scaninfo.scan_msg = scan_status_msg
# Ensure that ScanControlLoadMessage is set to SUCCESS
mock_bragg.scan_control.scan_msg._read_pv.mock_data = ScanControlLoadMessage.SUCCESS
with (
mock.patch.object(mock_bragg.scaninfo, "load_scan_metadata") as mock_load_scan_metadata,
mock.patch.object(mock_bragg, "_check_scan_msg") as mock_check_scan_msg,
mock.patch.object(mock_bragg, "on_unstage"),
):
scan_name = scan_status_msg.content["info"].get("scan_name", "")
# Chek the not implemented fly scan first, should raise Mo1BraggError
if scan_name not in ["xas_simple_scan", "xas_simple_scan_with_xrd"]:
with pytest.raises(Mo1BraggError):
mock_bragg.stage()
assert mock_check_scan_msg.call_count == 1
assert mock_load_scan_metadata.call_count == 1
else:
with (
mock.patch.object(mock_bragg, "set_xas_settings") as mock_xas_settings,
mock.patch.object(mock_bragg, "set_xrd_settings") as mock_xrd_settings,
mock.patch.object(
mock_bragg, "set_scan_control_settings"
) as mock_set_scan_control_settings,
):
# Check xas_simple_scan
if scan_name == "xas_simple_scan":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=False,
enable_high=False,
num_trigger_low=0,
num_trigger_high=0,
exp_time_low=0,
exp_time_high=0,
cycle_low=0,
cycle_high=0,
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)
# Check xas_simple_scan_with_xrd
elif scan_name == "xas_simple_scan_with_xrd":
mock_bragg.stage()
assert mock_xas_settings.call_args == mock.call(
low=scan_status_msg.content["info"]["kwargs"]["start"],
high=scan_status_msg.content["info"]["kwargs"]["stop"],
scan_time=scan_status_msg.content["info"]["kwargs"]["scan_time"],
)
assert mock_xrd_settings.call_args == mock.call(
enable_low=scan_status_msg.content["info"]["kwargs"]["xrd_enable_low"],
enable_high=scan_status_msg.content["info"]["kwargs"][
"xrd_enable_high"
],
num_trigger_low=scan_status_msg.content["info"]["kwargs"][
"num_trigger_low"
],
num_trigger_high=scan_status_msg.content["info"]["kwargs"][
"num_trigger_high"
],
exp_time_low=scan_status_msg.content["info"]["kwargs"]["exp_time_low"],
exp_time_high=scan_status_msg.content["info"]["kwargs"][
"exp_time_high"
],
cycle_low=scan_status_msg.content["info"]["kwargs"]["cycle_low"],
cycle_high=scan_status_msg.content["info"]["kwargs"]["cycle_high"],
)
assert mock_set_scan_control_settings.call_args == mock.call(
mode=ScanControlMode.SIMPLE,
scan_duration=scan_status_msg.content["info"]["kwargs"][
"scan_duration"
],
)

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,303 @@
# pylint: skip-file
from unittest import mock
from bec_lib.messages import DeviceInstructionMessage
from bec_server.device_server.tests.utils import DMMock
from debye_bec.scans import XASSimpleScan, XASSimpleScanWithXRD
def test_xas_simple_scan():
# create a fake device manager that we can use to add devices
device_manager = DMMock()
device_manager.add_device("mo1_bragg")
request = XASSimpleScan(
start=0, stop=5, scan_time=1, scan_duration=10, device_manager=device_manager
)
request.metadata["RID"] = "my_test_request_id"
with (
mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]),
mock.patch.object(request.stubs, "_get_from_rpc", return_value=True),
):
reference_commands = list(request.run())
for cmd in reference_commands:
if not cmd:
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 0, "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 1, "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan",
"scan_type": "fly",
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 2, "RID": "my_test_request_id"},
device=None,
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "DIID": 3, "RID": "my_test_request_id"},
device=None,
action="baseline_reading",
parameter={},
),
DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"DIID": 4,
"RID": "my_test_request_id",
"response": True,
},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 5, "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}, "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 6, "RID": "my_test_request_id"},
device="mo1_bragg",
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 8, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 10, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 11, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 12, "RID": "my_test_request_id"},
device=None,
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 13, "RID": "my_test_request_id"},
device=None,
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 14, "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
def test_xas_simple_scan_with_xrd():
# create a fake device manager that we can use to add devices
device_manager = DMMock()
device_manager.add_device("mo1_bragg")
request = XASSimpleScanWithXRD(
start=0,
stop=5,
scan_time=1,
scan_duration=10,
xrd_enable_low=True,
num_trigger_low=1,
exp_time_low=1,
cycle_low=1,
xrd_enable_high=True,
num_trigger_high=2,
exp_time_high=3,
cycle_high=4,
device_manager=device_manager,
)
request.metadata["RID"] = "my_test_request_id"
with (
mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]),
mock.patch.object(request.stubs, "_get_from_rpc", return_value=True),
):
reference_commands = list(request.run())
for cmd in reference_commands:
if not cmd:
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 0, "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 1, "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan_with_xrd",
"scan_type": "fly",
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 2, "RID": "my_test_request_id"},
device=None,
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "DIID": 3, "RID": "my_test_request_id"},
device=None,
action="baseline_reading",
parameter={},
),
DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"DIID": 4,
"RID": "my_test_request_id",
"response": True,
},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 5, "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}, "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 6, "RID": "my_test_request_id"},
device="mo1_bragg",
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 7, "RID": "my_test_request_id"},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 8, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 9, "RID": "my_test_request_id"},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 10, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 11, "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 12, "RID": "my_test_request_id"},
device=None,
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 13, "RID": "my_test_request_id"},
device=None,
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "DIID": 14, "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -2,10 +2,12 @@ from unittest import mock
import numpy as np
import pytest
from bec_lib import MessageEndpoints, ServiceConfig, messages
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import MessageObject
from bec_lib.service_config import ServiceConfig
from bec_plugins.services.NIDAQ_writer import NIDAQWriterService
from debye_bec.services.NIDAQ_writer import NIDAQWriterService
def test_nidaq_starts_consumers():
@@ -32,8 +34,7 @@ class NIWriterMock(NIDAQWriterService):
@pytest.fixture(scope="function")
def nidaq():
service = NIWriterMock(
config=ServiceConfig(redis={"host": "test", "port": 6379}),
connector_cls=mock.MagicMock(),
config=ServiceConfig(redis={"host": "test", "port": 6379}), connector_cls=mock.MagicMock()
)
yield service
@@ -46,11 +47,8 @@ def test_nidaq_scan_status_consumer(nidaq):
def test_scan_status_callback(nidaq):
scan_status_msg = messages.ScanStatusMessage(scanID="test", status="open", info={})
msg_obj = MessageObject(
topic="test",
value=scan_status_msg.dumps(),
)
scan_status_msg = messages.ScanStatusMessage(scan_id="test", status="open", info={})
msg_obj = MessageObject(topic="test", value=scan_status_msg.dumps())
with mock.patch.object(nidaq, "handle_scan_status") as mock_handle:
nidaq._scan_status_callback(msg_obj, nidaq)
mock_handle.assert_called_once_with(scan_status_msg)
@@ -83,10 +81,10 @@ def test_nidaq_reads_data_from_strea(nidaq):
mock_handle.assert_called_once()
@pytest.mark.parametrize("scan_status", ["open", "closed", "aborted", "halted", None])
@pytest.mark.parametrize("scan_status", ["open", "closed", "aborted", "halted"])
def test_nidaq_handle_scan_status(nidaq, scan_status):
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status=scan_status, info={"scan_number": 5}
scan_id="test", status=scan_status, info={"scan_number": 5}
)
nidaq.handle_scan_status(scan_status_msg)
if scan_status == "open":
@@ -99,9 +97,14 @@ def test_nidaq_handle_scan_status(nidaq, scan_status):
def test_nidaq_handle_ni_data(nidaq):
data = [
messages.DeviceMessage(signals={"signal1": list(range(10)), "signal2": list(range(10))}),
messages.DeviceMessage(
signals={"signal1": list(range(10, 20)), "signal2": list(range(10, 20))}
signals={"signal1": {"value": list(range(10))}, "signal2": {"value": list(range(10))}}
),
messages.DeviceMessage(
signals={
"signal1": {"value": list(range(10, 20))},
"signal2": {"value": list(range(10, 20))},
}
),
]
@@ -114,7 +117,7 @@ def test_nidaq_handle_ni_data(nidaq):
def test_nidaq_write_data_without_filename(nidaq):
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
with mock.patch("debye_bec.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
nidaq.write_data(signal)
mock_h5py.File.assert_not_called()
@@ -122,7 +125,7 @@ def test_nidaq_write_data_without_filename(nidaq):
def test_nidaq_write_data_with_filename(nidaq):
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
nidaq.filename = "test.h5"
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
with mock.patch("debye_bec.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
nidaq.write_data(signal)
mock_h5py.File.assert_called_once_with("test.h5", "a")
@@ -131,7 +134,7 @@ def test_nidaq_write_data_reshape(nidaq):
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
nidaq.filename = "test.h5"
nidaq.reshape_dataset = True
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
with mock.patch("debye_bec.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
nidaq.write_data(signal)
mock_h5py.File.assert_called_once_with("test.h5", "a")
@@ -140,7 +143,7 @@ def test_nidaq_write_data_without_reshape(nidaq):
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
nidaq.filename = "test.h5"
nidaq.reshape_dataset = False
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
with mock.patch("debye_bec.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
nidaq.write_data(signal)
mock_h5py.File.assert_called_once_with("test.h5", "a")
file_handle = mock_h5py.File().__enter__()
@@ -161,7 +164,7 @@ def test_nidaq_write_data_reshapes_data(nidaq):
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
nidaq.filename = "test.h5"
nidaq.reshape_dataset = True
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
with mock.patch("debye_bec.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
file_handle = mock_h5py.File().__enter__()
file_handle.__contains__.side_effect = signal.__contains__
nidaq.write_data(signal)