101 Commits

Author SHA1 Message Date
Unknown MX Person
d1844eaeba WIP 2025-03-25 17:42:55 +01:00
gac-x06da
d53ec4c14c WIP 2025-03-21 09:57:25 +01:00
gac-x06da
4091f11b0a WIP 2025-03-21 09:27:29 +01:00
gac-x06da
29ae5c196b WIP 2025-03-14 16:10:16 +01:00
gac-x06da
74521da7b3 Blacking 2025-03-14 12:40:23 +01:00
gac-x06da
a5f844b816 Keyword scans for 2D 2025-03-14 12:32:57 +01:00
gac-x06da
34d4d6ef8c Added AttributeError to updates 2025-03-14 12:32:57 +01:00
ci_update_bot
19b2299840 docs: Update device list 2025-03-11 12:29:00 +00:00
gac-x06da
d18c099058 WIP 2025-03-11 13:24:06 +01:00
gac-x06da
5ca9972383 WIP 2025-03-11 13:24:06 +01:00
gac-x06da
3b9a86c8c8 WIP 2025-03-11 13:24:06 +01:00
gac-x06da
e64b5b2c3d WIP 2025-03-11 13:24:06 +01:00
gac-x06da
284914dc53 Keyword auto updates for alignment scans 2025-03-11 13:24:06 +01:00
gac-x06da
d8a178ae13 Bump 2025-03-11 13:24:06 +01:00
gac-x06da
e79a3f785a Helical scan with proggress bar 2025-03-11 13:24:06 +01:00
gac-x06da
963b775200 Fitting and pneumatic valve 2025-03-11 13:24:06 +01:00
gac-x06da
9ea249fff7 Fix from Klaus 2025-03-11 13:24:06 +01:00
gac-x06da
2930673bbc Two SmarGon axis versions 2025-03-11 13:24:06 +01:00
gac-x06da
6a45a6a357 GUI commands run individually but script runs into timeout 2025-03-11 13:24:06 +01:00
gac-x06da
a794e3f60d Workaround on motor schema expected by BEC to move 2025-03-11 13:24:06 +01:00
gac-x06da
6b4a175f78 Another SmarGon client approach 2025-03-11 13:24:06 +01:00
gac-x06da
c8a1add697 More stable SmarGon movement 2025-03-11 13:24:06 +01:00
gac-x06da
a5642b5db2 Bump to create branch 2025-03-11 13:24:06 +01:00
ci_update_bot
d1c2dbb46b docs: Update device list 2025-01-30 11:29:00 +00:00
93d79eccd4 Update pyproject.toml 2025-01-30 12:24:31 +01:00
gac-x06da
1e81aa34b9 Plugins were not meant to run standalone 2025-01-30 12:15:15 +01:00
gac-x06da
c5b97bd592 Plugins were not meant to run standalone 2025-01-30 12:13:37 +01:00
gac-x06da
42d518c2e4 Plugins were not meant to run standalone 2025-01-30 12:09:12 +01:00
gac-x06da
9a40cbd8ae Enabling AD plugin to crash 2025-01-30 11:11:15 +01:00
gac-x06da
59bd4aeb9a First helical scan passed 2025-01-30 10:43:38 +01:00
gac-x06da
a455a490c6 Flaking 2025-01-29 13:13:21 +01:00
gac-x06da
22c46f8f8e Mono scan works 2025-01-28 15:45:10 +01:00
gac-x06da
4b76d1b191 omove works without crash 2025-01-27 18:27:09 +01:00
gac-x06da
4fc31e5f5d Samcam image preview and smargon waiting 2025-01-27 17:37:53 +01:00
gac-x06da
286c7a4bff Array preview also works 2025-01-27 15:13:16 +01:00
gac-x06da
7ab682817f Working samcam stream preview with StdDaq client 2025-01-27 12:25:27 +01:00
gac-x06da
ac3d82f7ef Merge branch 'main' into feature/device-aerotech-a3200 2025-01-24 16:21:49 +01:00
gac-x06da
023e0aab2e Fixing SmarGon axes 2025-01-24 16:17:04 +01:00
gac-x06da
21bd57393f Blacking and cleanup of unused code 2025-01-24 15:42:46 +01:00
gac-x06da
82d51649ee Blacking and cleanup of unused code 2025-01-24 15:21:59 +01:00
gac-x06da
015bf2ee3b Blacking 2025-01-24 15:13:24 +01:00
gac-x06da
24302c244d Fix for the new scan type validation 2025-01-24 15:06:45 +01:00
gac-x06da
a8990f8de2 axis client getting ready 2025-01-22 17:42:50 +01:00
gac-x06da
8da2ed4102 BEC style A3200 seems working 2025-01-20 18:41:04 +01:00
gac-x06da
03a5850bbf SmarGon reads 2025-01-15 17:44:46 +01:00
gac-x06da
d3d016108e First tries with ABR 2025-01-15 14:46:51 +01:00
gac-x06da
0b99a82ae9 Waiting for raster scan works 2024-12-17 17:41:47 +01:00
Unknown MX Person
add46d8b0d A3200 cleanup 2024-12-13 17:44:52 +01:00
Unknown MX Person
78c75b1769 Moving towards beamline startup 2024-11-13 14:30:57 +01:00
434ddad89a added license 2024-10-01 10:21:41 +02:00
Unknown MX Person
b0703552f2 Aerotech scans work nicely 2024-09-20 14:38:04 +02:00
Unknown MX Person
14ca9bd74a Daily commit mit some patching 2024-09-19 17:30:13 +02:00
Unknown MX Person
2563471ac8 Dailiy commit 2024-09-18 18:26:54 +02:00
Unknown MX Person
23aadabfd1 First Aerotech scan works 2024-09-18 12:49:33 +02:00
Unknown MX Person
3bf21ff647 Both ABR stage and scan instantiates, need Zac to test it safely 2024-09-09 13:23:03 +02:00
Unknown MX Person
602317faa8 A3200 starts to get ready for scanning 2024-08-28 13:23:06 +02:00
Unknown MX Person
13c6d7b8fb Starting to move things to scans 2024-08-27 17:58:33 +02:00
e32642526f build: pandas and matplotlib added as dependencies 2024-08-14 15:50:35 +02:00
e7fd8e453d Still a lot to do 2024-07-16 14:32:09 +02:00
4a6e4092ca Merge branch 'main' into feature/device-aerotech-a3200 2024-07-09 11:15:13 +02:00
f22487a45b Merge branch 'features/updating-naming-convention' into 'main'
Updating naming convention

See merge request bec/pxiii_bec!15
2024-07-09 10:34:42 +02:00
ace0303301 Naming convention updates for OP 2024-07-08 15:42:03 +02:00
8c4ade4034 Naming convention updates for OP 2024-07-08 15:23:50 +02:00
a799321f98 Merge branch 'fix/bec-widgets-auto-update-import' into 'main'
fix(auto_updates): import from bec_widgets cli autoupdates fixed

See merge request bec/pxiii_bec!14
2024-07-08 14:42:57 +02:00
989dc4be36 fix(auto_updates): import from bec_widgets cli autoupdates fixed 2024-07-08 14:20:21 +02:00
65800812a5 ECMC virtal energy motors 2024-06-26 10:53:54 +02:00
12803b4b6f Merge branch 'fix/epics_ca_addr_list' into 'main'
Changed EPICS_CA_ADDR_LIST to PX III

See merge request bec/pxiii_bec!13
2024-06-23 11:09:53 +02:00
2ee2b25c21 Cleaned up imports and PVs 2024-06-21 12:13:57 +02:00
gac-x06da
1834e6f55d Klaus changed EPICS_CA_ADDR_LIST to PX III 2024-06-21 11:07:39 +02:00
1a204693dc Added rocking 2024-06-13 13:34:02 +02:00
20dff942c1 Basic PVPositioner works 2024-06-13 12:41:32 +02:00
eec897f713 Device draft from office 2024-06-13 12:07:03 +02:00
5c4a0f92bc Merge branch 'feat/db-hide-compact-config' into 'main'
Hiding compact config from tests

See merge request bec/pxiii_bec!11
2024-06-07 16:22:57 +02:00
b9f0574876 Hiding compact config from tests 2024-06-07 16:15:21 +02:00
f0da85e930 Added keithley diodes in ES 2024-06-07 15:28:22 +02:00
8bb7d2332c All devices load for now 2024-06-07 15:07:14 +02:00
ci_update_bot
6b32ee6ee9 docs: Update device list 2024-06-07 10:04:44 +00:00
84eb0c74c6 Preliminary DB generator from ophyd_devices 2024-06-07 12:02:52 +02:00
824706d1d8 Merge branch 'fix_imports' into 'main'
fix: adapt import to refactoring in bec_lib

See merge request bec/pxiii_bec!10
2024-05-15 18:58:04 +02:00
7cd2a4d44c fix: adapt import to refactoring in bec_lib 2024-05-15 15:10:54 +02:00
2a1180cc90 ci: added child pipeline var 2024-05-13 16:39:31 +02:00
a32d0821ed ci: added ci file 2024-05-07 12:59:02 +02:00
a6f8d6936e Merge branch 'fix/pre_startup' into 'main'
fix: added get config for PX-III

See merge request bec/pxiii_bec!9
2024-04-29 17:29:27 +02:00
fa52c5eee1 fix: added get config for PX-III 2024-04-29 14:00:38 +02:00
1448c6b82a Merge branch 'feature/update_script' into 'main'
feat: added auto updates for bec figures

See merge request bec/pxiii-bec!7
2024-04-24 07:28:17 +02:00
f768808776 Merge branch 'fix/file_writer' into 'main'
fix: added file writer entry point

See merge request bec/pxiii-bec!8
2024-04-23 19:09:17 +02:00
265c09d746 fix: added file writer entry point 2024-04-23 18:27:08 +02:00
cae8614cb5 feat: added auto updates for bec figures 2024-04-23 13:25:08 +02:00
be5c35a938 build: fixed ds plugin name 2024-04-23 09:56:38 +02:00
2d18332e23 Merge branch 'build/plugin_update' into 'main'
build: updated to new plugin structure

See merge request bec/pxiii-bec!6
2024-04-23 08:08:21 +02:00
b9d471361a build: added CA startup entrypoint 2024-04-22 19:33:01 +02:00
ce4d57c597 build: updated to new plugin structure 2024-04-22 19:18:27 +02:00
e81c8b1484 Merge branch 'doc-pmodule-update' into 'master'
doc: psi-python311/2024.02 is now stable

See merge request bec/pxiii-bec!5
2024-03-13 21:30:26 +01:00
1a84335bbb doc: psi-python311/2024.02 is now stable 2024-03-12 13:33:24 +01:00
e87598bc7e Merge branch 'drop-python39' into 'master'
ci: drop python/3.9

See merge request bec/pxiii-bec!4
2024-03-05 22:39:41 +01:00
106f3bed6e ci: drop python/3.9 2024-03-05 12:59:22 +01:00
1de99e47d6 Merge branch 'python39' into 'master'
transition to python/3.9

See merge request bec/pxiii-bec!3
2023-12-12 21:16:36 +01:00
0cdb8e3ca6 refactor: update to psi-python39/2021.11 in deploy script 2023-12-12 16:51:58 +01:00
3887cdb835 build: require python3.9 2023-12-12 16:36:33 +01:00
d89ba54768 Merge branch 'fix-imports' into 'master'
fix: update imports

See merge request bec/pxiii-bec!2
2023-12-08 12:45:36 +01:00
04b08b9d21 fix: update imports 2023-12-08 12:37:37 +01:00
50 changed files with 8126 additions and 316 deletions

7
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,7 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "pxiii"
target: "pxiii_bec"
branch: $CHILD_PIPELINE_BRANCH

28
LICENSE Normal file
View File

@@ -0,0 +1,28 @@
BSD 3-Clause License
Copyright (c) 2024, Paul Scherrer Institute
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -1 +0,0 @@
from .bec_client import *

View File

@@ -1 +0,0 @@
from .plugins import *

View File

@@ -1,245 +0,0 @@
from bec_client.scan_manager import ScanReport
from bec_utils.devicemanager import Device
# pylint:disable=undefined-variable
# pylint: disable=too-many-arguments
def dscan(
motor1: Device, m1_from: float, m1_to: float, steps: int, exp_time: float, **kwargs
) -> ScanReport:
"""Relative line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position relative to the current position.
m1_to (float): End position relative to the current position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> dscan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=True, **kwargs
)
def d2scan(
motor1: Device,
m1_from: float,
m1_to: float,
motor2: Device,
m2_from: float,
m2_to: float,
steps: int,
exp_time: float,
**kwargs
) -> ScanReport:
"""Relative line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> d2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=True,
**kwargs
)
def ascan(motor1, m1_from, m1_to, steps, exp_time, **kwargs):
"""Absolute line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position.
m1_to (float): End position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> ascan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=False, **kwargs
)
def a2scan(motor1, m1_from, m1_to, motor2, m2_from, m2_to, steps, exp_time, **kwargs):
"""Absolute line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> a2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=False,
**kwargs
)
def dmesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Relative mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> dmesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=True,
)
def amesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Absolute mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> amesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=False,
)
def umv(*args) -> ScanReport:
"""Updated absolute move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umv(dev.samx, 1)
>>> umv(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=False)
def umvr(*args) -> ScanReport:
"""Updated relative move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umvr(dev.samx, 1)
>>> umvr(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=True)
def mv(*args) -> ScanReport:
"""Absolute move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mv(dev.samx, 1)
>>> mv(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=False)
def mvr(*args) -> ScanReport:
"""Relative move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mvr(dev.samx, 1)
>>> mvr(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=True)

View File

@@ -1 +0,0 @@

View File

@@ -1,25 +0,0 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to set up the BEC client configuration. The script is
executed in the global namespace of the BEC client. This means that all
variables defined here are available in the BEC client.
To set up the BEC client configuration, use the ServiceConfig class. For example,
to set the configuration file path, add the following lines to the script:
import pathlib
from bec_lib.core import ServiceConfig
current_path = pathlib.Path(__file__).parent.resolve()
CONFIG_PATH = f"{current_path}/<path_to_my_config_file.yaml>"
config = ServiceConfig(CONFIG_PATH)
If this startup script defined a ServiceConfig object, the BEC client will use
it to configure itself. Otherwise, the BEC client will use the default config.
"""
# example:
# current_path = pathlib.Path(__file__).parent.resolve()
# CONFIG_PATH = f"{current_path}/../../../bec_config.yaml"
# config = ServiceConfig(CONFIG_PATH)

View File

@@ -4,7 +4,7 @@
BEAMLINE_REPO=gitlab.psi.ch:bec/pxiii-bec.git
git clone git@$BEAMLINE_REPO
module add psi-python38/2020.11
module add psi-python311/2024.02
# start redis
docker run --network=host --name redis-bec -d redis
@@ -27,4 +27,3 @@ pip install -e ./pxiii-bec
# start the BEC server
bec-server start --config ./pxiii-bec/deployment/bec-server-config.yaml

View File

@@ -16,31 +16,30 @@ parse the --session argument, add the following lines to the script:
if args.session == "my_session":
print("Loading my_session session")
from bec_plugins.bec_client.plugins.my_session import *
from bec_plugins.bec_ipython_client.plugins.my_session import *
else:
print("Loading default session")
from bec_plugins.bec_client.plugins.default_session import *
from bec_plugins.bec_ipython_client.plugins.default_session import *
"""
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
import argparse
from bec_lib.core import bec_logger
from bec_lib import bec_logger
logger = bec_logger.logger
logger.info("Using the PXIII startup script.")
logger.info("Using the PX-III startup script.")
parser = argparse.ArgumentParser()
parser.add_argument("--session", help="Session name", type=str, default="cSAXS")
args = parser.parse_args()
# pylint: disable=import-error
_args = _main_dict["args"]
# SETUP BEAMLINE INFO
from bec_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
_session_name = "PX-III"
if _args.session.lower() == "alignment":
# load the alignment session
_session_name = "Alignment"
logger.success("Alignment session loaded.")
bec._beamline_mixin._bl_info_register(SLSInfo)
bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS
bec._ip.prompts.username = "PXIII"
bec._ip.prompts.username = _session_name
bec._ip.prompts.status = 1

View File

@@ -0,0 +1,23 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
"""
from bec_lib.service_config import ServiceConfig
def extend_command_line_args(parser):
"""
Extend the command line arguments of the BEC client.
"""
parser.add_argument("--session", help="Session name", type=str, default="PX-III")
return parser
def get_config() -> ServiceConfig:
"""
Create and return the service configuration.
"""
return ServiceConfig(redis={"host": "x06da-bec-001", "port": 6379})

View File

@@ -0,0 +1 @@
from .auto_updates import PlotUpdate

View File

@@ -0,0 +1,173 @@
from bec_widgets.cli.auto_updates import AutoUpdates, ScanInfo
from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError
class PlotUpdate(AutoUpdates):
create_default_dock = True
dock_name = "default_dock"
enabled = True
_scan_msg = None
# def __init__(self, gui: BECDockArea):
# super().__init__(gui)
def start_default_dock(self):
"""
Create a default dock for the auto updates.
"""
self._default_dock = self.gui.add_dock(self.dock_name)
self._default_dock.add_widget("BECWaveformWidget")
self._default_fig = self._default_dock.widget_list[0]
def do_update(self, msg):
"""Save the original scan message for future use"""
self._scan_msg = msg
return super().do_update(msg)
# def simple_line_scan(self, info: ScanInfo) -> None:
# """
# Simple line scan.
# """
# dev_x = info.scan_report_devices[0]
# dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
# if not dev_y:
# return
# self.figure.clear_all()
# plt = self.figure.plot(dev_x, dev_y)
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def plot_handler(self, info: ScanInfo) -> None:
"""Simple keyword handler for 'plot'
This simple keyword handler looks for the keyword 'plot' in the scan arguments.
This allows the user to explictly specify the desired data source. Useful for alignment
scans.
"""
print(info.scan_report_devices)
dev_x = info.scan_report_devices[0]
# Keyword lookup for 'plot' and 'fit'
if "kwargs" in self._scan_msg.info:
signals = self._scan_msg.info["kwargs"].get("plot", None)
fit = self._scan_msg.info["kwargs"].get("fit", None)
else:
signals = None
fit = None
if not signals:
return
if isinstance(signals, str):
signals = [signals]
if isinstance(fit, str):
fit = [fit]
# try:
# self.gui.clear_all()
# except RPCResponseTimeoutError:
# pass
# try:
# self._default_dock = self.gui.add_dock(self.dock_name)
# except RPCResponseTimeoutError:
# pass
# try:
# self._default_fig = self._default_dock.add_widget("BECWaveformWidget")
# except RPCResponseTimeoutError:
# pass
# try:
# self._default_fig = self._default_dock.widget_list[0]
# except RPCResponseTimeoutError:
# pass
# # dck1 = self._default_dock
# # dck1.clear_all()
plt1 = self.get_default_figure()
# # Clear figure
# try:
# old_data = yield plt1.get_all_data()
# except RPCResponseTimeoutError:
# old_data = [1,2,3,4,5,6,7,8,9]
# print(old_data)
# for _ in range(len(old_data)):
# try:
# plt1.remove_curve(-1)
# except RPCResponseTimeoutError:
# pass
# except Exception as ex:
# print(f"{ex}\t{type(ex)}")
# clear_all() will throw RPCResponseTimeoutError
# try:
# plt1.clear_all()
# except RPCResponseTimeoutError:
# pass
print(type(plt1), plt1)
print(f"Plotted signals: {signals}")
# plot() will throw RPCResponseTimeoutError
try:
if len(info.scan_report_devices) == 2:
# 2D plot
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
plt1.plot(
x_name=dev_x,
y_name=dev_y,
z_name=signals[0],
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
z_label=signals[0],
)
elif len(info.scan_report_devices) == 1:
# 1D plot
dev_x = info.scan_report_devices[0]
for sig in signals:
try:
plt1.plot(
x_name=dev_x,
y_name=sig,
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=sig,
)
except RPCResponseTimeoutError:
pass
else:
# Default is 1D
dev_x = info.scan_report_devices[0]
for sig in signals:
try:
plt1.plot(
x_name=dev_x,
y_name=sig,
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=sig,
)
except RPCResponseTimeoutError:
pass
except RPCResponseTimeoutError:
pass
if fit is not None and len(fit):
dev_x = info.scan_report_devices[0]
sig = signals[0]
plt1.add_dap(dev_x, sig, dap=fit[0])
def handler(self, info: ScanInfo) -> None:
"""Dock configuration handler"""
# EXAMPLES:
# if info.scan_name == "line_scan" and info.scan_report_devices:
# self.simple_line_scan(info)
# return
# if info.scan_name == "grid_scan" and info.scan_report_devices:
# self.run_grid_scan_update(info)
# return
super().handler(info)
self.plot_handler(info)

2107
pxiii_bec/config_saved.yaml Normal file

File diff suppressed because it is too large Load Diff

2094
pxiii_bec/config_saved.yaml~ Normal file

File diff suppressed because it is too large Load Diff

View File

View File

View File

@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
os.environ["EPICS_CA_ADDR_LIST"] = "129.129.110.255"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

View File

@@ -0,0 +1,703 @@
sls_current:
description: SLS current
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'ARS07-DPCT-0100:CURR', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- ring
- fe
readOnly: true
softwareTrigger: false
vg0_press:
description: VG0 pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-VMCC-0000:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
abs_press:
description: Absorber pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-ABS1-VMCC-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
sldi_cenx:
description: FE slit-diaphragm horizontal center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizex:
description: FE slit-diaphragm horizontal size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_ceny:
description: FE slit-diaphragm vertical center
deviceClass: ophyd_devices.EpicsMotorEC
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizey:
description: FE slit-diaphragm vertical size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
slh_trxr:
description: OP slit inner blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
slh_trxw:
description: OP slit outer blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
fi1_try:
description: Beam attenuator motion before mono
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-FI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_theta1:
description: Monochromator pitch 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_diode:
description: Diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:BOT:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_theta2:
description: Monochromator pitch 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH2'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_xbpm:
description: XBPM total intensity after monochromator
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_energy:
description: Monochromator energy
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_eoffset:
description: Monochromator energy offset
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:EOFFSET'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm_trx:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm_try:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# ssxbpm:
# description: XBPM before secondary source
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig: {read_pv: 'X06DA-ES-SSBPM1:SumAll:MeanValue_RBV'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: true
# softwareTrigger: false
ssslit_trxr:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssslit_trxw:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssslit_tryt:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYT'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssslit_tryb:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYB'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxi1_trx:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxi1_try:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# vfm_tryuw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_tryr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_trydw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
vfm_pitch:
description: KB mirror vertical steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:PITCH'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-VFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# hfm_tryur:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_tryw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_trydr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
hfm_pitch:
description: KB mirror horizontal steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:PITCH'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-HFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# xbox_xbpm:
# description: Exposure box XBPM
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig: {read_pv: 'X06DA-ES-XBBPM1:SumAll:MeanValue_RBV'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: true
# softwareTrigger: false
xbox_fil1:
description: Exposure box filter wheel 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI1:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil2:
description: Exposure box filter wheel 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI2:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil3:
description: Exposure box filter wheel 3
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI3:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil4:
description: Exposure box filter wheel 4
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI4:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_diode:
description: Exposure box diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DI1:READOUT'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
gonpos:
description: Sample sensor distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-USER1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
gonvalid:
description: Sample in valid distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-CMP1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
samzoom:
description: Sample microscope zoom
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SAMCAM:ZOOM'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samcam:
description: Sample camera aggregate device
deviceClass: pxiii_bec.devices.SamCamDetector
deviceConfig: {prefix: 'X06DA-SAMCAM:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samstream:
description: Sample camera ZMQ stream
deviceClass: pxiii_bec.devices.StdDaqPreviewDetector
deviceConfig:
url: 'tcp://129.129.110.12:9089'
deviceTags:
- detector
enabled: true
readoutPriority: async
readOnly: false
softwareTrigger: false
# samimg:
# description: Sample camera image from EPICS
# deviceClass: pxiii_bec.devices.NDArrayPreview
# deviceConfig:
# prefix: 'X06DA-SAMCAM:image1:'
# deviceTags:
# - detector
# enabled: true
# readoutPriority: async
# readOnly: false
# softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic in-out
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_x:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_y:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_z:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
bstop_diode:
description: Beamstop diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-BS:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
frontlight:
description: Microscope frontlight
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', kind: 'config', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
backlight:
description: Backlight reflector
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
det_y:
description: Pilatus height
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_z:
description: Pilatus translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmx:
description: ABR horizontal stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMX', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmy:
description: ABR vertical stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMY', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmz:
description: ABR axial stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMZ', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
omega:
description: ABR rotation stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:OMEGA', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
abr:
description: Aerotech ABR motion system
deviceClass: pxiii_bec.devices.AerotechAbrStage
deviceConfig: {prefix: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shx:
description: SmarGon X axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shy:
description: SmarGon Y axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shz:
description: SmarGon Z axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 10, high_limit: 22, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
chi:
description: SmarGon CHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 0, high_limit: 40, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
phi:
description: SmarGon PHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false

410
pxiii_bec/devices/A3200.py Normal file
View File

@@ -0,0 +1,410 @@
"""
``Aerotech`` --- Aerotech control software
******************************************
This module provides an object to control the Aerotech Abr rotational stage.
Methods in the Abr class
========================
Standard bluesky interface:
AerotechAbrStage.configure(d={...})
AerotechAbrStage.kickoff()
AerotechAbrStage.stop()
Additional bluesky functionality:
Aerotech.is_homed()
Aerotech.do_homing(wait=True)
Aerotech.get_ready(ostart=None, orange=None, etime=None, wait=True)
Aerotech.is_done()
Aerotech.is_ready()
Aerotech.is_busy()
Aerotech.start_exposure()
Aerotech.wait_status(status)
Aerotech.move(angle, wait=False, speed=None)
Aerotech.set_shutter(state)
Returns the axis mode: ``-ES-DF1:AXES-MODE``
Examples
========
abr = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr")
# move omega to 270.0 degrees
abr.omega = 270.0
# move omega to 180 degrees and wait for movement to finish
abr.move(180, wait=True)
# move omega to 3000 degrees at 360 degrees/s and wait for movement to finish
abr.move(3000, velocity=360, wait=True)
# stop any movement
abr.stop() # this function only returns after the STATUS is back to OK
"""
import time
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare
try:
from .A3200enums import AbrCmd, AbrMode
except ImportError:
from A3200enums import AbrCmd, AbrMode
from bec_lib import bec_logger
logger = bec_logger.logger
# pylint: disable=logging-fstring-interpolation
class AerotechAbrMixin(CustomPrepare):
"""Configuration class for the Aerotech A3200 controller for the ABR stage"""
def on_stage(self):
"""
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
"""
logger.warning(f"Configuring {self.parent.scaninfo.scan_msg.info['scan_name']} on ABR")
d = {}
if self.parent.scaninfo.scan_type in ("measure", "measurement", "fly"):
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
scanname = self.parent.scaninfo.scan_msg.info["scan_name"]
if scanname in (
"standardscan",
"helicalscan",
"helicalscan1",
"helicalscan2",
"helicalscan3",
):
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scanargs["start"]
d["var_2"] = scanargs["range"]
d["var_3"] = scanargs["move_time"]
d["var_4"] = scanargs.get("ready_rate", 500)
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("verticallinescan", "vlinescan"):
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
d["var_1"] = scanargs["range"] / scanargs["steps"]
d["var_2"] = scanargs["steps"]
d["var_3"] = scanargs["exp_time"]
d["var_4"] = 0
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("screeningscan"):
d["scan_command"] = AbrCmd.SCREENING
d["var_1"] = scanargs["start"]
d["var_2"] = scanargs["oscrange"]
d["var_3"] = scanargs["exp_time"]
d["var_4"] = scanargs["range"] / scanargs["steps"]
d["var_5"] = scanargs["steps"]
d["var_6"] = scanargs.get("delta", 0.5)
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("rasterscan", "rastersimplescan"):
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
d["var_1"] = scanargs["exp_time"]
d["var_2"] = scanargs["range_x"] / scanargs["steps_x"]
d["var_3"] = scanargs["range_y"] / scanargs["steps_y"]
d["var_4"] = scanargs["steps_x"]
d["var_5"] = scanargs["steps_y"]
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
# Reconfigure if got a valid scan config
if len(d) > 0:
self.parent.configure(d)
# Stage the parent
self.parent.bluestage()
def on_kickoff(self):
"""Kick off parent"""
self.parent.bluekickoff()
def on_unstage(self):
"""Unstage the ABR controller"""
self.parent.blueunstage()
class AerotechAbrStage(BECDeviceBase):
"""Standard PX stage on A3200 controller
This is the wrapper class for the standard rotation stage layout for the PX
beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and
the associated motion axes GMX, GMY and GMZ. The ophyd class associates to
the general PX measurement procedure, which is that the actual scan script
is running as an AeroBasic program on the controller and we communicate to
it via 10+1 global variables.
"""
custom_prepare_cls = AerotechAbrMixin
USER_ACCESS = ["reset", "kickoff", "bluekickoff", "complete", "set_axis_mode", "arm", "disarm"]
taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
clear = Component(EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted)
# Enable/disable motor movement via the IOC (i.e. make it task-only)
axisModeLocked = Component(EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted)
axisModeDirect = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
axisAxesMode = Component(EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted)
# Shutter box is missing readback so the -GET signal is installed on the VME
# _shutter = Component(
# EpicsSignal, "-PH1:GET", write_pv="-PH1:SET", put_complete=True, kind=Kind.config,
# )
# Status flags for all axes
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", kind=Kind.normal)
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", kind=Kind.normal)
gmy_done = Component(EpicsSignalRO, "-DF1:GMY-DONE", kind=Kind.normal)
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", kind=Kind.normal)
# For some reason the task interface is called PSO...
scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
# Global variables to controll AeroBasic scripts
_var_1 = Component(EpicsSignal, "-PSO:VAR-1", put_complete=True, kind=Kind.omitted)
_var_2 = Component(EpicsSignal, "-PSO:VAR-2", put_complete=True, kind=Kind.omitted)
_var_3 = Component(EpicsSignal, "-PSO:VAR-3", put_complete=True, kind=Kind.omitted)
_var_4 = Component(EpicsSignal, "-PSO:VAR-4", put_complete=True, kind=Kind.omitted)
_var_5 = Component(EpicsSignal, "-PSO:VAR-5", put_complete=True, kind=Kind.omitted)
_var_6 = Component(EpicsSignal, "-PSO:VAR-6", put_complete=True, kind=Kind.omitted)
_var_7 = Component(EpicsSignal, "-PSO:VAR-7", put_complete=True, kind=Kind.omitted)
_var_8 = Component(EpicsSignal, "-PSO:VAR-8", put_complete=True, kind=Kind.omitted)
_var_9 = Component(EpicsSignal, "-PSO:VAR-9", put_complete=True, kind=Kind.omitted)
_var_10 = Component(EpicsSignal, "-PSO:VAR-10", put_complete=True, kind=Kind.omitted)
# Task status PVs (programs always run on task 1)
task1 = Component(EpicsSignalRO, "-AERO:TSK1-DONE", auto_monitor=True)
task2 = Component(EpicsSignalRO, "-AERO:TSK2-DONE", auto_monitor=True)
task3 = Component(EpicsSignalRO, "-AERO:TSK3-DONE", auto_monitor=True)
task4 = Component(EpicsSignalRO, "-AERO:TSK4-DONE", auto_monitor=True)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", kind=Kind.config)
def set_axis_mode(self, mode: str, settle_time=0.1) -> None:
"""Set axis mode to direct/measurement mode.
Measurement ensures that the scrips run undisturbed by blocking axis
motion commands from the IOC (i.e. internal only).
Parameters:
-----------
mode : str
Valid values are 'direct' and 'measuring'.
"""
if mode == "direct":
self.axisModeDirect.set(37, settle_time=settle_time).wait()
if mode == "measuring":
self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait()
def configure(self, d: dict) -> tuple:
""" " Configure the exposure scripts
Script execution at the PX beamlines is based on scripts that are always
running on the controller that execute commands when commanded by
setting a pre-defined set of global variables. This method performs the
configuration of the exposure scrips by setting the required global
variables.
Parameters in d: dict
---------------------
scan_command: int
The index of the desired AeroBasic program to be executed.
Usually supported values are taken from an Enum.
var_1:
var_2:
var_3:
var_4:
var_5:
var_6:
var_7:
var_8:
var_9:
var_10:
"""
old = self.read_configuration()
# ToDo: Check if idle before reconfiguring
self.scan_command.set(d["scan_command"]).wait()
# Set the corresponding global variables
if "var_1" in d and d["var_1"] is not None:
self._var_1.set(d["var_1"]).wait()
if "var_2" in d and d["var_2"] is not None:
self._var_2.set(d["var_2"]).wait()
if "var_3" in d and d["var_3"] is not None:
self._var_3.set(d["var_3"]).wait()
if "var_4" in d and d["var_4"] is not None:
self._var_4.set(d["var_4"]).wait()
if "var_5" in d and d["var_5"] is not None:
self._var_5.set(d["var_5"]).wait()
if "var_6" in d and d["var_6"] is not None:
self._var_6.set(d["var_6"]).wait()
if "var_7" in d and d["var_7"] is not None:
self._var_7.set(d["var_7"]).wait()
if "var_8" in d and d["var_8"] is not None:
self._var_8.set(d["var_8"]).wait()
if "var_9" in d and d["var_9"] is not None:
self._var_9.set(d["var_9"]).wait()
if "var_10" in d and d["var_10"] is not None:
self._var_10.set(d["var_10"]).wait()
new = self.read_configuration()
return old, new
def bluestage(self):
"""Bluesky-style stage
Since configuration synchronization is not guaranteed, this does
nothing. The script launched by kickoff().
"""
def bluekickoff(self, timeout=1) -> SubscriptionStatus:
"""Kick off the set program"""
self.start_command.set(1).wait()
# Define wait until the busy flag goes high
def is_busy(*, value, **_):
return bool(value == 0)
# Subscribe and wait for update
status = SubscriptionStatus(self.scan_done, is_busy, timeout=timeout, settle_time=0.1)
status.wait()
# return status
def blueunstage(self, settle_time=0.1):
"""Stops current script and releases the axes"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
def complete(self, timeout=None) -> SubscriptionStatus:
"""Waits for task execution
NOTE: Original complete was raster scanner complete...
"""
# Define wait until the busy flag goes down (excluding initial update)
def is_idle(*, value, **_):
return bool(value == 1)
# Subscribe and wait for update
# status = SubscriptionStatus(self.task1, is_idle, timeout=timeout, settle_time=0.5)
status = SubscriptionStatus(self.scan_done, is_idle, timeout=timeout, settle_time=0.5)
return status
def reset(self, settle_time=0.1, wait_after_reload=1) -> None:
"""Resets the Aerotech controller state
Attempts to reset the currently running measurement task on the A3200
by stopping current motions, reloading aerobasic programs and going
to DIRECT mode.
This will stop movements in both DIRECT and MEASURING modes. During the
stop the `status` temporarely goes to ERROR but reverts to OK after a
couple of seconds.
"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1, settle_time=settle_time)
# Reload tasks
self.taskStop.set(1, settle_time=wait_after_reload).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
# pylint: disable=arguments-differ
def stop(self, settle_time=1.0) -> None:
"""Stops current motions"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
def is_ioc_ok(self):
"""Checks execution status"""
return 0 == self.status.get()
@property
def axis_mode(self):
"""Read axis mode"""
return self.axisAxesMode.get()
# @property
# def shutter(self):
# return self._shutter.get()
# @shutter.setter
# def shutter(self, value):
# if self.axisAxesMode.get():
# print("ABR is not in direct mode; cannot manipulate shutter")
# return False
# state = str(state).lower()
# if state not in ["1", "0", "closed", "open"]:
# print("unknown shutter state requested")
# return None
# elif state in ["1", "open"]:
# state = 1
# elif state == ["0", "closed"]:
# state = 0
# self._shutter.set(state).wait()
# return state == self._shutter.get()
def wait_for_movements(self, timeout=60.0):
"""Waits for all motor movements"""
t_start = time.time()
t_elapsed = 0
while self.is_moving() and t_elapsed < timeout:
t_elapsed = time.time() - t_start
if timeout is not None and t_elapsed > timeout:
raise TimeoutError("Timeout waiting for all axis to stop moving")
time.sleep(0.5)
def is_moving(self):
"""Chechs if all axes are DONE"""
return not (
self.omega_done.get()
and self.gmx_done.get()
and self.gmy_done.get()
and self.gmz_done.get()
)
if __name__ == "__main__":
abr = AerotechAbrStage(prefix="X06DA-ES", name="abr")
abr.wait_for_connection()

View File

@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
"""
Enumerations for the MX specific Aerotech A3200 stage.
@author: mohacsi_i
"""
# pylint: disable=too-few-public-methods
class AbrStatus:
"""ABR measurement task status"""
DONE = 0
READY = 1
BUSY = 2
class AbrGridStatus:
"""ABR grid scan status"""
BUSY = 0
DONE = 1
class AbrMode:
"""ABR mode status"""
DIRECT = 0
MEASURING = 1
class AbrShutterStatus:
"""ABR shutter status"""
CLOSE = 0
OPEN = 1
class AbrCmd:
"""ABR command table"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
VERTICAL_LINE_SCAN = 3
SCREENING = 4
SUPER_FAST_OMEGA = 5
STILL_WEDGE = 6
STILLS = 7
REPEAT_SINGLE_OSCILLATION = 8
SINGLE_OSCILLATION = 9
OLD_FASHIONED = 10
RASTER_SCAN = 11
JET_ROTATION = 12
X_HELICAL = 13
X_RUNSEQ = 14
JUNGFRAU = 15
MSOX = 16
SLIT_SCAN = 17
RASTER_SCAN_STILL = 18
SCAN_SASTT = 19
SCAN_SASTT_V2 = 20
SCAN_SASTT_V3 = 21
class AbrAxis:
"""ABR axis index"""
OMEGA = 1
GMX = 2
GMY = 3
GMZ = 4
STY = 5
STZ = 6

View File

@@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
import types
from collections import OrderedDict
from ophyd import Component, PVPositioner, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
from ophyd.status import Status, MoveStatus
from bec_lib import bec_logger
from .A3200enums import AbrMode
logger = bec_logger.logger
# ABR_DONE = 0
# ABR_READY = 1
# ABR_BUSY = 2
# GRID_SCAN_BUSY = 0
# GRID_SCAN_DONE = 1
# DIRECT_MODE = 0
# MEASURING_MODE = 1
class A3200Axis(PVPositioner):
"""Positioner wrapper for A3200 axes
Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC
does not provide a motor record, this class simply wraps axes into a
standard Ophyd positioner for the BEC. It also has some additional
functionality for error checking and diagnostics.
Examples
--------
omega = A3200Axis('X06DA-ES-DF1:OMEGA', base_pv='X06DA-ES')
Parameters
----------
prefix : str
Axis PV name root.
base_pv : str (situational)
IOC PV name root, i.e. X06DA-ES if standalone class.
"""
USER_ACCESS = ["omove"]
abr_mode_direct = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
abr_mode = Component(
EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted
)
# Basic PV positioner interface
done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted)
# Setpoint is one of the two...
setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config)
# setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
velocity = Component(EpicsSignal, "-SETV", kind=Kind.config)
status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
# PV to issue native relative movements on the A3200
relmove = Component(EpicsSignal, "-INCP", put_complete=True, kind=Kind.config)
# PV to home axis
home = Component(EpicsSignal, "-HOME", kind=Kind.config)
ishomed = Component(EpicsSignal, "-AS00", kind=Kind.config)
# HW status words
dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.normal)
ashw = Component(EpicsSignalRO, "-ASHW", auto_monitor=True, kind=Kind.normal)
fslw = Component(EpicsSignalRO, "-FSLW", auto_monitor=True, kind=Kind.normal)
# Rock movement
_rock = Component(EpicsSignal, "-ROCK", put_complete=True, kind=Kind.config)
_rock_dist = Component(EpicsSignal, "-RINCP", put_complete=True, kind=Kind.config)
_rock_velo = Component(EpicsSignal, "-RSETV", put_complete=True, kind=Kind.config)
_rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config)
# _rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config)
hlm = Component(Signal, kind=Kind.config)
llm = Component(Signal, kind=Kind.config)
vmin = Component(Signal, kind=Kind.config)
vmax = Component(Signal, kind=Kind.config)
offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config)
# pylint: disable=too-many-arguments
def __init__(
self,
prefix="",
*,
name,
base_pv="",
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
llm=0,
hlm=0,
vmin=0,
vmax=0,
**kwargs,
):
"""__init__ MUST have a full argument list"""
# Patching the parent's PVs into the axis class to check for direct/locked mode
if parent is None:
def maybe_add_prefix(self, _, kw, suffix):
# Patched not to enforce parent prefix when no parent
if kw in self.add_prefix:
return suffix
return suffix
self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, self.__class__.__dict__["abr_mode"]
)
self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, self.__class__.__dict__["abr_mode_direct"]
)
logger.info(self.__class__.__dict__["abr_mode"].kwargs)
self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE"
self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT"
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.llm.set(llm).wait()
self.hlm.set(hlm).wait()
self.vmin.set(vmin).wait()
self.vmax.set(vmax).wait()
def omove(
self,
position,
wait=True,
timeout=None,
# moved_cb=None,
velocity=None,
relative=False,
direct=False,
**kwargs,
) -> MoveStatus:
"""Native absolute/relative movement on the A3200"""
# Check if we're in direct movement mode
if self.abr_mode.value not in (AbrMode.DIRECT, "DIRECT"):
if direct:
self.abr_mode_direct.set(1).wait()
else:
raise RuntimeError(f"ABR axis not in direct mode: {self.abr_mode.value}")
# Before moving, ensure we can stop (if a stop_signal is configured).
if self.stop_signal is not None:
self.stop_signal.wait_for_connection()
# Set velocity if provided
if velocity is not None:
self.velocity.set(velocity).wait()
# This is adapted from pv_positioner.py
self.check_value(position)
# Get MoveStatus from parent of parent
status = PositionerBase.move(self, position=position, timeout=timeout, **kwargs)
has_done = self.done is not None
if not has_done:
moving_val = 1 - self.done_value
self._move_changed(value=self.done_value)
self._move_changed(value=moving_val)
try:
if relative:
# Relative movement instead of setpoint
self.relmove.put(position, wait=True)
else:
# Standard absolute movement
self.setpoint.put(position, wait=True)
if wait:
status.wait()
except KeyboardInterrupt:
self.stop()
raise
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus:
"""Exposes the ophyd move command through BEC abstraction"""
return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs)
def rock(self, distance, counts: int, velocity=None, wait=True) -> Status:
"""Repeated single axis zigzag scan I guess PSO should be configured for this"""
self._rock_dist.put(distance)
self._rock_count.put(counts)
if velocity is not None:
self._rock_velo.put(velocity)
# if acceleration is not None:
# self._rock_accel.put(acceleration)
self._rock.put(1)
status = super().move(position=distance)
if wait:
status.wait()
return status
# def is_omega_ok(self):
# """Checks omega axis status"""
# return 0 == self.self.omega.status.get()
# def is_homed(self):
# """Checks if omega is homed"""
# return 1 == self.omega.is_homed.get()
# def do_homing(self, wait=True):
# """Execute the homing procedure.
# Executes the homing procedure on omega and waits (default) until it is completed.
# TODO: Return a status object to do this wwith futures and monitoring.
# PARAMETERS
# `wait` true / false if the routine is to wait for the homing to finish.
# """
# self.omega.home.set(1, settle_time=1).wait()
# if not wait:
# return
# while not self.omega.is_homed():
# time.sleep(0.2)
# Automatically start an axis if directly invoked
if __name__ == "__main__":
omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", base_pv="X06DA-ES", name="omega")
omega.wait_for_connection()

View File

@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
"""
``NDArrayPreview`` --- Standalone Preview for ImagePlugin
*********************************************************
This module provides a standalone object to receive images to ophyd from the
AreaDetector's ImagePlugin.
Created on Wed Jan 29 2025
@author: mohacsi_i
"""
import numpy as np
from ophyd import Device, Component, EpicsSignal, EpicsSignalWithRBV, Kind, Staged
from ophyd.areadetector import NDDerivedSignal
from bec_lib import bec_logger
logger = bec_logger.logger
class SilentNDDerivedSignal(NDDerivedSignal):
"""Silent version of NDDerivedSignal, it does not spam the terminal on
every defective frame (shit happens, ok?)."""
def _array_shape_callback(self, **kwargs):
try:
super()._array_shape_callback(**kwargs)
except RuntimeError:
pass
class NDArrayPreview(Device):
"""Wrapper class around AreaDetector's NDStdArray plugins
This is a standalone class to display images from AreaDetector's
ImagePlugin without using a parent device. It also offers BEC exposed
methods to transfer image and change image array Kind-ness.
NOTE: As an explicit request, it can toggle data recording
"""
# Subscriptions for plotting image
USER_ACCESS = ["image", "savemode"]
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
# Status attributes
min_callback_time = Component(
EpicsSignalWithRBV, "MinCallbackTime", kind=Kind.config, put_complete=True
)
array_size_x = Component(EpicsSignal, "ArraySize0_RBV", kind=Kind.config)
array_size_y = Component(EpicsSignal, "ArraySize1_RBV", kind=Kind.config)
array_size_z = Component(EpicsSignal, "ArraySize2_RBV", kind=Kind.config)
ndimensions = Component(EpicsSignal, "NDimensions_RBV", kind=Kind.config)
array_data = Component(EpicsSignal, "ArrayData", kind=Kind.omitted)
shaped_image = Component(
SilentNDDerivedSignal,
derived_from="array_data",
shape=("array_size_z", "array_size_y", "array_size_x"),
num_dimensions="ndimensions",
kind=Kind.omitted,
)
def read(self):
"""Stream out data on every read()"""
if self._staged == Staged.yes:
image = self.shaped_image.get()
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
return super().read()
def savemode(self, save=False):
"""Toggle save mode for the shaped image"""
# pylint: disable=protected-access
if save:
self.shaped_image._kind = Kind.normal
else:
self.shaped_image._kind = Kind.omitted
def image(self):
"""Fallback method in case image streaming fills up the BEC"""
array_size = (self.array_size_z.get(), self.array_size_y.get(), self.array_size_x.get())
if array_size == (0, 0, 0):
raise RuntimeError("Invalid image; ensure array_callbacks are on")
if array_size[-1] == 0:
array_size = array_size[:-1]
image = self.array_data.get()
return np.array(image).reshape(array_size)
# Automatically connect to SAMCAM at PXIII if directly invoked
if __name__ == "__main__":
img = NDArrayPreview("X06DA-SAMCAM:image1:", name="samimg")
img.wait_for_connection()

View File

@@ -0,0 +1,47 @@
from ophyd import EpicsSignal
from ophyd.status import SubscriptionStatus
class PneumaticValve(EpicsSignal):
"""Wrapper around EpicsSignal to wait until reaching target. Use the
status returned by set() to wait until movement is finished. Do NOT
use put if you want to wait, that's a low-level PV write op.
NOTE: The SET and GET states do not match exactly
"""
def set(self, value, *, timeout=5, settle_time=0.1):
"""Overloaded setter that waits for target state
NOTE: The SubscriptionStatus callback does not run in put()
"""
# Lazy hardcoded state lookup
target = 1 if value in (1, "Measure") else 2
# Define wait until an end state is reached
def on_target(*, value, **_):
return bool(value == target)
# Subscribe a monitor in advance and wait for update
status = SubscriptionStatus(self, on_target, timeout=timeout, settle_time=0.1)
# Set value to start movement
super().set(value, settle_time=settle_time).wait()
# Return the monitor
return status
def check_value(self, value):
"""Input validation"""
if value not in (0, 1, "Measure", "Park"):
raise ValueError(f"Unsupported pneumatic valve target {value}")
return super().check_value(value)
if __name__ == "__main__":
pneum = PneumaticValve(
read_pv="X06DA-ES-BS:GET-POS",
write_pv="X06DA-ES-BS:SET-POS",
auto_monitor=True,
put_complete=True,
name="bspump",
)
pneum.wait_for_connection()

View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
"""
``SamCam`` --- Sample Camera control software
*********************************************
This module provides an object to control the sample camera at the PX III
beamline. The camera should run continously and stream data via ZMQ for
the GUI and alignment scripts.
Created on Thu Jan 30 2025
@author: mohacsi_i
"""
from ophyd import ADComponent
from ophyd_devices.devices.areadetector.cam import GenICam
# from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
CustomDetectorMixin,
)
from bec_lib import bec_logger
logger = bec_logger.logger
class SamCamSetup(CustomDetectorMixin):
"""Simple camera mixin class, the SAMCAM is usually streaming"""
def on_stage(self):
"""Just make sure it's running continously"""
self.parent.cam.acquire.put(1, wait=True)
def on_unstage(self):
"""Should run continously"""
def on_stop(self):
"""Should run continously"""
class SamCamDetector(PSIDetectorBase):
"""Sample camera device
The SAMCAM continously streams images to the GUI and sample alignment
scripts via ZMQ.
"""
custom_prepare_cls = SamCamSetup
cam = ADComponent(GenICam, "cam1:")
# image = ADComponent(ImagePlugin_V35, "image1:")

View File

@@ -0,0 +1,284 @@
"""
``SmarGon`` --- SmarGon control software
******************************************
The module provides an object to control the SmarGon goniometer axes at PX III.
The SmarGon axes are interfaced as positioners.
"""
import time
from threading import Thread, Lock
import requests
from requests.adapters import HTTPAdapter, Retry
from collections import OrderedDict
from ophyd import Component, Kind, Signal, PVPositioner
from ophyd.status import SubscriptionStatus
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("SmarGon")
# SmarGon contoller can't really handle multiple connections
# Use this mutex to ensure one access at a time
mutex = Lock()
class SmarGonSignal(Signal):
"""SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
"""
def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
super().__init__(*args, **kwargs)
self.write_addr = write_addr
self.addr = self.parent.name
self._limits = (low_limit, high_limit)
# self.get()
def put(self, value, *, timestamp=None, **kwargs):
"""Overriden put to add communication with smargopolo"""
# Validate new value and get timestamp
self.check_value(value)
if timestamp is None:
timestamp = time.time()
# Perform the actual write to SmargoPolo
# pylint: disable=protected-access
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}")
# pylint: disable=attribute-defined-outside-init
old_value = self._readback
self._timestamp = timestamp
self._readback = r[self.addr.upper()]
self._value = r[self.addr.upper()]
# Notify subscribers
self._run_subs(
sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp
)
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
"""Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
raise ValueError(f"Target {value} outside of limits {self.limits}")
hil = self.limits[1]
if hil is not None:
if value > hil:
raise ValueError(f"Target {value} outside of limits {self.limits}")
def get(self, **kwargs):
# pylint: disable=protected-access
r = self.parent._go_n_get(self.write_addr)
# pylint: disable=attribute-defined-outside-init
self._value = r[self.addr.upper()] if isinstance(r, dict) else r
return super().get(**kwargs)
class SmarGonSignalRO(Signal):
"""Small helper class for read-only parameters PVs from SmarGon.
Reads and optionally monitors a variable on the SmarGon.
"""
def __init__(self, *args, read_addr="readbackSCS", auto_monitor=False, **kwargs):
super().__init__(*args, **kwargs)
self._metadata["write_access"] = False
self.read_addr = read_addr
self.addr = self.parent.name
if auto_monitor:
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def get(self, **kwargs):
# pylint: disable=protected-access
r = self.parent._go_n_get(self.read_addr)
if isinstance(r, dict):
self.put(r[self.addr.upper()], force=True)
else:
self.put(r, force=True)
return self._readback
def poll(self):
"""Fooo"""
time.sleep(2)
while True:
time.sleep(0.25)
try:
self.get()
except requests.ConnectTimeout as ex:
logger.error(f"[{self.name}] {ex}")
class SmarGonAxis(PVPositioner):
"""SmarGon client deice
This class controls the SmarGon goniometer via the REST interface. All
SmarGon axes share a common mutex to manage actual HW access.
"""
USER_ACCESS = ["omove"]
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config)
mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config)
# Axis parameters
readback = Component(SmarGonSignalRO, kind=Kind.hinted, auto_monitor=True)
setpoint = Component(SmarGonSignal, kind=Kind.normal)
done = Component(Signal, value=1, kind=Kind.normal)
# moving = Component(SmarGonMovingSignalRO, kind=Kind.config)
_tol = 0.001
# pylint: disable=too-many-arguments
def __init__(
self,
prefix="SCS",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit=None,
high_limit=None,
**kwargs,
) -> None:
self.__class__.__dict__["readback"].kwargs["read_addr"] = f"readback{prefix}"
self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}"
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
self.__class__.__dict__["sg_url"].kwargs["value"] = sg_url
# Fine-tune HTTP connection behavior
# NOTE: SmarGon has a few failed requests every one in a while
self._s = requests.Session()
retries = Retry(total=5, backoff_factor=0.05, status_forcelist=[500, 502, 503, 504])
self._s.mount("http://", HTTPAdapter(max_retries=retries))
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
def initialize(self):
"""Helper function for initial readings"""
# self.corr.get()
# self.mode.get()
r = self._go_n_get("corr_type")
print(r)
def move(self, position, wait=True, timeout=None, moved_cb=None):
"""Move command that's masked by BEC"""
return self.omove(position, wait, timeout, moved_cb)
def omove(self, position, wait=True, timeout=2.0, moved_cb=None):
"""Original move command without the BEC wrappers"""
status = self.setpoint.set(position, settle_time=0.1)
status.wait()
if not wait:
return status
def on_target(*, value, **_):
distance = abs(value - self.setpoint._value)
print(f"[self.name] Distance: {distance}")
return bool(distance < self._tol)
status = SubscriptionStatus(self.readback, on_target, timeout=timeout, settle_time=0.1)
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def _pos_changed(self, timestamp=None, value=None, **kwargs):
pass
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.get(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error getting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.put(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error putting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
if __name__ == "__main__":
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(
prefix="SCS",
name="shz",
low_limit=10,
high_limit=22,
sg_url="http://x06da-smargopolo.psi.ch:3000",
)
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()

View File

@@ -0,0 +1,249 @@
"""
``SmarGon`` --- SmarGon control software
******************************************
The module provides an object to control the SmarGon goniometer axes at PX III.
The SmarGon axes are interfaced as positioners.
"""
import time
import threading
from collections import OrderedDict
import requests
from requests.adapters import HTTPAdapter, Retry
from ophyd import Component, Kind, Signal, PVPositioner
from ophyd.status import SubscriptionStatus
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("SmarGon")
# SmarGon contoller can't really handle multiple connections
# Use this mutex to ensure one access at a time
mutex = threading.Lock()
class LimitedSmarGonSignal(Signal):
"""SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
"""
def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
self._limits = (low_limit, high_limit)
super().__init__(*args, **kwargs)
self.write_addr = write_addr
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
"""Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
raise ValueError(f"Target {value} outside of limits {self.limits}")
hil = self.limits[1]
if hil is not None:
if value > hil:
raise ValueError(f"Target {value} outside of limits {self.limits}")
def put(self, value, *, timestamp=None, force=False, metadata=None, **kwargs):
"""Overriden put to add communication with smargopolo"""
# Validate new value and get timestamp
if not force:
self.check_value(value)
if timestamp is None:
timestamp = time.time()
# Perform the actual write to SmargoPolo
# pylint: disable=protected-access
r = self.parent._go_n_put(f"{self.write_addr}?{self.parent.name.upper()}={value}")
# pylint: disable=attribute-defined-outside-init
old_value = self._readback
self._timestamp = timestamp
self._readback = r[self.parent.name.upper()]
self._value = r[self.parent.name.upper()]
# Notify subscribers
self._run_subs(
sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp
)
class SmarGonAxis(PVPositioner):
"""SmarGon client deice
This class controls the SmarGon goniometer via the REST interface. All
SmarGon axes share a common mutex to manage actual HW access.
"""
USER_ACCESS = ["omove", "oldmove"]
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
# Axis parameters
readback = Component(Signal, kind=Kind.hinted, metadata={"write_access": False})
setpoint = Component(LimitedSmarGonSignal, kind=Kind.normal)
done = Component(Signal, value=1, kind=Kind.normal, metadata={"write_access": False})
_tol = 0.001
# pylint: disable=too-many-arguments
def __init__(
self,
prefix="SCS",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit=None,
high_limit=None,
**kwargs,
) -> None:
# self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}"
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
self.__class__.__dict__["sg_url"].kwargs["value"] = sg_url
# Fine-tune HTTP connection behavior
# NOTE: SmarGon has a few failed requests every one in a while
self._s = requests.Session()
retries = Retry(total=5, backoff_factor=0.05, status_forcelist=[500, 502, 503, 504])
self._s.mount("http://", HTTPAdapter(max_retries=retries))
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
def on_target():
"""Monitors the setpoint and readback and calculates the on_target flag"""
time.sleep(2)
while True:
# Read back target and setpoint values
# pylint: disable=protected-access
r = self._go_n_get("readbackSCS")
rb = r[self.name.upper()]
self.readback.set(rb, force=True).wait()
r = self._go_n_get("targetSCS")
sp = r[self.name.upper()]
self.setpoint._value = sp
# print(f"Readback: {rb}\tSetpoint: {sp}")
# Check if they're within tolerance
distance = abs(rb - sp)
done = 1 if distance < self._tol else 0
self.done.put(done, force=True)
time.sleep(0.2)
self._mon = threading.Thread(target=on_target, daemon=True)
self._mon.start()
def omove(self, position, wait=True, timeout=None, moved_cb=None):
"""Move command that's masked by BEC"""
self.done.put(0, force=True)
return self.move(position, wait, timeout, moved_cb)
def oldmove(self, position, wait=True, timeout=2.0, moved_cb=None):
"""Original move command without the BEC wrappers"""
status = self.setpoint.set(position, settle_time=0.1).wait()
if not wait:
return status
def on_target(*, value, **_):
distance = abs(value - self.setpoint._value)
print(f"[self.name] Distance: {distance}")
return bool(distance < self._tol)
status = SubscriptionStatus(self.readback, on_target, timeout=timeout, settle_time=0.1)
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def _pos_changed(self, timestamp=None, value=None, **kwargs):
"""Remove EPICS dependency"""
pass
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.get(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error getting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.put(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error putting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
if __name__ == "__main__":
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(
prefix="SCS",
name="shz",
low_limit=10,
high_limit=22,
sg_url="http://x06da-smargopolo.psi.ch:3000",
)
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()

View File

@@ -0,0 +1,393 @@
# #!/usr/bin/env python3
# from time import sleep, time
# from typing import Tuple
# from requests import get, put
# from beamline import beamline
# from mx_redis import SMARGON
# try:
# from mx_preferences import get_config
# host = get_config(beamline)["smargon"]["host"]
# port = get_config(beamline)["smargon"]["port"]
# except Exception:
# host = "x06da-smargopolo.psi.ch"
# port = 3000
# base = f"http://{host}:{port}"
# def gonget(thing: str, **kwargs) -> dict:
# """issue a GET for some API component on the smargopolo server"""
# cmd = f"{base}/{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# r = get(cmd)
# if not r.ok:
# raise Exception(f"error getting {thing}; server returned {r.status_code} => {r.reason}")
# return r.json()
# def gonput(thing: str, **kwargs):
# """issue a PUT for some API component on the smargopolo server"""
# cmd = f"{base}/{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def scsput(**kwargs):
# """
# Issue a new absolute target in the SH coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
# :type kwargs: dict
# :return:
# :rtype:
# """
# xyz = {
# k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
# }
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetSCS?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def bcsput(**kwargs):
# """
# Issue a new absolute target in the beamline coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("bx", "by", "bz", "chi", "phi")
# :return:
# :rtype:
# """
# xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz", "chi", "phi")}
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetBCS?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def scsrelput(**kwargs) -> None:
# """
# Issue relative increments to current SH coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
# :type kwargs: dict
# :return:
# :rtype:
# """
# xyz = {
# k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
# }
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetSCS_rel?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def bcsrelput(**kwargs):
# """
# Issue relative increments to current beamline coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("bx", "by", "bz")
# :type kwargs: dict
# :return:
# :rtype:
# """
# xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz")}
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetBCS_rel?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# # url_redis = f"{beamline}-cons-705.psi.ch"
# # print(f"connecting to redis DB #3 on host: {url_redis}")
# # redis_handle = redis.StrictRedis(host=url_redis, db=3)
# # pubsub = redis_handle.pubsub()
# MODE_UNINITIALIZED = 0
# MODE_INITIALIZING = 1
# MODE_READY = 2
# MODE_ERROR = 99
# class SmarGon(object):
# def __init__(self):
# super(SmarGon, self).__init__()
# self.__dict__.update(target=None)
# self.__dict__.update(bookmarks={})
# self.__dict__.update(_latest_message={})
# # pubsub.psubscribe(**{f"__keyspace@{SMARGON.value}__:*": self._cb_readbackSCS})
# # pubsub.run_in_thread(sleep_time=0.5, daemon=True)
# def __repr__(self):
# BX, BY, BZ, OMEGA, CHI, PHI, a, b, c = self.readback_bcs().values()
# return f"<{self.__class__.__name__} X={BX:.3f}, Y={BY:.3f}, Z={BZ:.3f}, CHI={CHI:.3f}, PHI={PHI:.3f}, OMEGA={OMEGA:.3f}>"
# def _cb_readbackSCS(self, msg):
# if msg["data"] in ["hset"]:
# self._latest_message = msg
# def move_home(self, wait=False) -> None:
# """move to beamline coordinate system X, Y, Z, Chi, Phi = 0 0 0 0 0"""
# self.apply_bookmark_sh({"shx": 0.0, "shy": 0.0, "shz": 18.0, "chi": 0.0, "phi": 0.0})
# if wait:
# self.wait_home()
# def xyz(self, coords: Tuple[float, float, float], wait: bool = True) -> None:
# """
# Move smargon in absolute beamline coordinates
# :param coords: a tuple of floats representing X, Y, Z coordinates
# :type coords:
# :param wait:
# :type wait:
# :return:
# :rtype:
# """
# x, y, z = coords
# # the two steps below are necessary otherwise the control system
# # remembers *a* previous CHI
# bcs = self.bcs
# bcs.update({"BX": x, "BY": y, "BZ": z})
# self.bcs = bcs
# if wait:
# self.wait()
# def wait_home(self, timeout: float = 20.0) -> None:
# """
# wait for the smargon to reach its home position:
# SHX = 0.0
# SHY = 0.0
# SHZ = 18.0
# CHI = 0.0
# PHI = 0.0
# :param timeout: time to wait for positions to be reached raises TimeoutError if timeout reached
# :type timeout: float
# :return:
# :rtype:
# """
# tout = timeout + time()
# in_place = [False, False]
# rbv = -999.0
# while not all(in_place) and time() < tout:
# rbv = self.readback_scs()
# in_place = []
# for k, v in {"SHX": 0.0, "SHY": 0.0, "SHZ": 18.0, "CHI": 0.0, "PHI": 0.0}.items():
# in_place.append(abs(rbv[k] - v) < 0.01)
# if time() > tout:
# raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}")
# def push_bookmark(self):
# """
# save current absolute coordinates in FIFO stack
# :return:
# :rtype:
# """
# t = round(time())
# self.bookmarks[t] = self.readback_scs()
# def pop_bookmark(self):
# return self.bookmarks.popitem()[1]
# def apply_bookmark_sh(self, scs):
# scsput(**scs)
# def apply_last_bookmark_sh(self):
# scs = self.pop_bookmark()
# scsput(**scs)
# def readback_mcs(self):
# """current motor positions of the smargon sliders"""
# return gonget("readbackMCS")
# def readback_scs(self):
# """current SH coordinates of the smargon model"""
# return gonget("readbackSCS")
# def readback_bcs(self):
# """current beamline coordinates of the smargon"""
# return gonget("readbackBCS")
# def target_scs(self):
# """currently assigned targets for the smargon control system"""
# return gonget("targetSCS")
# def initialize(self):
# """initialize the smargon"""
# self.set_mode(MODE_UNINITIALIZED)
# sleep(0.1)
# self.set_mode(MODE_INITIALIZING)
# def set_mode(self, mode: int):
# """put smargon control system in a given mode
# MODE_UNINITIALIZED = 0
# MODE_INITIALIZING = 1
# MODE_READY = 2
# MODE_ERROR = 99
# """
# gonput(f"mode?mode={mode}")
# def enable_correction(self):
# """enable calibration based corrections"""
# gonput("corr_type?corr_type=1")
# def disable_correction(self):
# """disable calibration based corrections"""
# gonput("corr_type?corr_type=0")
# def chi(self, val=None, wait=False):
# if val is None:
# return self.readback_scs()["CHI"]
# scsput(CHI=val)
# if wait:
# timeout = 10 + time()
# while time() < timeout:
# if abs(val - self.readback_scs()["CHI"]) < 0.1:
# break
# if time() > timeout:
# raise RuntimeError(f"SmarGon CHI did not reach requested target {val} in time")
# def phi(self, val=None, wait=False):
# if val is None:
# return self.readback_scs()["PHI"]
# scsput(PHI=val)
# if wait:
# timeout = 70 + time()
# while time() < timeout:
# if abs(val - self.readback_scs()["PHI"]) < 0.1:
# break
# if time() > timeout:
# raise RuntimeError(f"SmarGon PHI did not reach requested target {val} in time")
# def wait(self, timeout=60.0):
# """waits up to `timeout` seconds for smargon to reach target"""
# target = {
# k.upper(): v
# for k, v in self.target_scs().items()
# if k.lower() in ("shx", "shy", "shz", "chi", "phi")
# }
# timeout = timeout + time()
# while time() < timeout:
# s = {
# k: (abs(v - target[k]) < 0.01)
# for k, v in self.readback_scs().items()
# if k.upper() in ("SHX", "SHY", "SHZ", "CHI", "PHI")
# }
# if all(list(s.values())):
# break
# if time() > timeout:
# raise TimeoutError("timed out waiting for smargon to reach target")
# def __setattr__(self, key, value):
# key = key.lower()
# if key == "mode":
# self.set_mode(value)
# elif key == "correction":
# assert value in (
# 0,
# 1,
# False,
# True,
# ), "correction is either 1 or True (enabled) or 0 (disabled)"
# gonput(f"corr_type?corr_type?{value}")
# elif key == "scs":
# scsput(**value)
# elif key == "bcs":
# bcsput(**value)
# elif key == "target":
# if not isinstance(value, dict):
# raise Exception(
# f"expected a dict with target axis and values got something else: {value}"
# )
# for k in value.keys():
# if k.lower() not in "shx shy shz chi phi ox oy oz".split():
# raise Exception(f'unknown axis in target "{k}"')
# scsput(**value)
# elif key in "shx shy shz chi phi ox oy oz".split():
# scsput(**{key: value})
# elif key in "bx by bz".split():
# bcs = self.readback_bcs()
# bcs[key] = value
# bcsput(**bcs)
# else:
# self.__dict__[key].update(value)
# def __getattr__(self, key):
# key = key.lower()
# if key == "mode":
# return self.readback_mcs()["mode"]
# elif key == "correction":
# return gonget("corr_type")
# elif key == "bcs":
# return self.readback_bcs()
# elif key == "mcs":
# return self.readback_mcs()
# elif key == "scs":
# return self.readback_scs()
# elif key in "shx shy shz chi phi ox oy oz".split():
# return self.readback_scs()[key.upper()]
# elif key in "bx by bz".split():
# return self.readback_bcs()[key.upper()]
# else:
# return self.__getattribute__(key)
# if __name__ == "__main__":
# import argparse
# parser = argparse.ArgumentParser(description="SmarGon client")
# parser.add_argument("-i", "--initialize", help="initialize smargon", action="store_true")
# args = parser.parse_args()
# smargon = SmarGon()
# if args.initialize:
# print("initializing smargon device")
# import Aerotech
# print("moving aerotech back by 50mm")
# abr = Aerotech.Abr()
# abr.incr_x(-50.0, wait=True, velo=100.0)
# print("issuing init command to smargon")
# smargon.initialize()
# sleep(0.5)
# print("waiting for init routine to complete")
# while MODE_READY != smargon.mode:
# sleep(0.5)
# print("moving smargon to HOME position")
# smargon.move_home()
# print("moving aerotech to its previous position")
# abr.incr_x(50.0, wait=True, velo=100.0)
# exit(0)

View File

@@ -0,0 +1,209 @@
# -*- coding: utf-8 -*-
"""
Standard DAQ preview image stream module
Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
import json
from time import sleep, time
from threading import Thread
import zmq
import numpy as np
from ophyd import Device, Signal, Component, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from bec_lib import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
class StdDaqPreviewMixin(CustomDetectorMixin):
"""Setup class for the standard DAQ preview stream
Parent class: CustomDetectorMixin
"""
_mon = None
def on_stage(self):
"""Start listening for preview data stream"""
if self._mon is not None:
self.parent.unstage()
sleep(0.5)
logger.info(f"[{self.parent.name}] Attaching monitor to {self.parent.url.get()}")
self.parent.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def on_unstage(self):
"""Stop a running preview"""
if self._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self._mon.join(timeout=1)
# So also disconnect the socket
try:
# pylint: disable=protected-access
self.parent._socket.disconnect(self.parent.url.get())
except zmq.error.ZMQError:
# Might be already closed
pass
def on_stop(self):
"""Stop a running preview"""
self.on_unstage()
def poll(self):
"""Collect streamed updates"""
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
logger.info(f"[{self.parent.name}]\tDetaching monitor")
break
# pylint: disable=no-member
# pylint: disable=protected-access
r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
if len(r) != 2:
logger.warning(
f"[{self.parent.name}] Received malformed array of length {len(r)}"
)
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.parent.throttle.get():
sleep(0.1)
continue
# Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
image = np.frombuffer(data, dtype=header["type"])
if image.size != np.prod(header["shape"]):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header["shape"])
# Update image and update subscribers
self.parent.array_counter.put(header["frame"], force=True)
self.parent.ndimensions.put(len(header["shape"]), force=True)
self.parent.array_size.put(header["shape"], force=True)
self.parent.array_average.put(np.mean(image), force=True)
# self.parent.array_data.put(data, force=True)
self.parent.shaped_image.put(image, force=True)
# pylint: disable=protected-access
self.parent._last_image = image
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last = t_curr
logger.info(
f"[{self.parent.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.parent.name}]\t{str(ex)}")
raise
finally:
self._mon = None
logger.info(f"[{self.parent.name}]\tDetaching monitor")
class StdDaqPreviewDetector(PSIDetectorBase):
"""Detector wrapper class around the StdDaq preview image stream.
This was meant to provide live image stream directly from the StdDAQ but
also works with other ARRAY v1 streamers, like the AreaDetector ZMQ plugin.
Note that the preview stream must be already throtled in order to cope with
the incoming data and the python class might throttle it further.
NOTE: As an explicit request, it does not record the image data.
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# Subscriptions for plotting image
USER_ACCESS = ["image", "savemode"]
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
custom_prepare_cls = StdDaqPreviewMixin
# Configuration attributes
url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
throttle = Component(Signal, value=0.25, kind=Kind.config)
# Streamed data status
array_average = Component(Signal, kind=Kind.hinted, metadata={"write_access": False})
array_counter = Component(Signal, kind=Kind.hinted, metadata={"write_access": False})
ndimensions = Component(Signal, kind=Kind.normal, metadata={"write_access": False})
array_size = Component(Signal, kind=Kind.normal, metadata={"write_access": False})
# array_data = Component(Signal, kind=Kind.omitted, metadata={"write_access": False})
shaped_image = Component(Signal, kind=Kind.omitted, metadata={"write_access": False})
_last_image = None
def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.url.set(url, force=True).wait()
# Connect to the DAQ
self.connect()
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface
StdDAQ may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
# Socket to talk to server
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url.get())
except ConnectionRefusedError:
sleep(1)
self._socket.connect(self.url.get())
def savemode(self, save=False):
"""Toggle save mode for the shaped image"""
# pylint: disable=protected-access
if save:
self.shaped_image._kind = Kind.normal
else:
self.shaped_image._kind = Kind.omitted
def image(self):
"""
Gets the last image as an attribute in case image must be abandoned
due to some caching on the BEC.
"""
return self._last_image
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
daq = StdDaqPreviewDetector(url="tcp://129.129.95.111:20000", name="preview")
daq.wait_for_connection()

View File

@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
"""
Ophyd devices for the PX III beamline, including the MX specific Aerotech A3200 stage.
@author: mohacsi_i
"""
from .A3200 import AerotechAbrStage
from .A3200utils import A3200Axis
from .SmarGonA import SmarGonAxis as SmarGonAxisA
from .SmarGonB import SmarGonAxis as SmarGonAxisB
from .StdDaqPreview import StdDaqPreviewDetector
from .NDArrayPreview import NDArrayPreview
from .SamCamDetector import SamCamDetector
from .PneumaticValve import PneumaticValve

View File

@@ -0,0 +1,11 @@
// This file was autogenerated. Do not edit it manually.
## Device List
### pxiii_bec
| Device | Documentation | Module |
| :----- | :------------- | :------ |
| A3200Axis | Positioner wrapper for A3200 axes<br><br><br> Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC<br> does not provide a motor record, this class simply wraps axes into a<br> standard Ophyd positioner for the BEC. It also has some additional<br> functionality for error checking and diagnostics.<br><br> Examples<br> --------<br> omega = A3200Axis('X06DA-ES-DF1:OMEGA', base_pv='X06DA-ES')<br><br> Parameters<br> ----------<br> prefix : str<br> Axis PV name root.<br> base_pv : str (situational)<br> IOC PV name root, i.e. X06DA-ES if standalone class.<br> | [pxiii_bec.devices.A3200utils](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/A3200utils.py) |
| AerotechAbrStage | Standard PX stage on A3200 controller<br><br> This is the wrapper class for the standard rotation stage layout for the PX<br> beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and<br> the associated motion axes GMX, GMY and GMZ. The ophyd class associates to<br> the general PX measurement procedure, which is that the actual scan script<br> is running as an AeroBasic program on the controller and we communicate to<br> it via 10+1 global variables.<br> | [pxiii_bec.devices.A3200](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/A3200.py) |
| NDArrayPreview | Wrapper class around AreaDetector's NDStdArray plugins<br><br> This is a standalone class to display images from AreaDetector's<br> ImagePlugin without using a parent device. It also offers BEC exposed<br> methods to transfer image and change image array Kind-ness.<br><br> NOTE: As an explicit request, it can toggle data recording<br> | [pxiii_bec.devices.NDArrayPreview](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/NDArrayPreview.py) |
| SamCamDetector | Sample camera device<br><br> The SAMCAM continously streams images to the GUI and sample alignment<br> scripts via ZMQ.<br> | [pxiii_bec.devices.SamCamDetector](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/SamCamDetector.py) |
| SmarGonAxis | SmarGon client deice<br><br> This class controls the SmarGon goniometer via the REST interface. All<br> SmarGon axes share a common mutex to manage actual HW access.<br> | [pxiii_bec.devices.SmarGonB](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/SmarGonB.py) |
| StdDaqPreviewDetector | Detector wrapper class around the StdDaq preview image stream.<br><br> This was meant to provide live image stream directly from the StdDAQ but<br> also works with other ARRAY v1 streamers, like the AreaDetector ZMQ plugin.<br> Note that the preview stream must be already throtled in order to cope with<br> the incoming data and the python class might throttle it further.<br><br> NOTE: As an explicit request, it does not record the image data.<br><br> You can add a preview widget to the dock by:<br> cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')<br> | [pxiii_bec.devices.StdDaqPreview](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/StdDaqPreview.py) |

View File

View File

@@ -0,0 +1,8 @@
from .mx_measurements import (
MeasureStandardWedge,
MeasureVerticalLine,
MeasureRasterSimple,
MeasureScreening,
MeasureHelical,
MeasureHelical2,
)

View File

@@ -0,0 +1,473 @@
"""MX measurements module
Scan primitives for standard BEC scans at the PX beamlines at SLS.
Theese scans define the event model and can be called from higher levels.
"""
import time
import numpy as np
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class AbrCmd:
"""Valid Aerotech ABR scan commands from the AeroBasic files"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
VERTICAL_LINE_SCAN = 3
SCREENING = 4
# SUPER_FAST_OMEGA = 5 # Some Japanese measured samples in capilaries at high RPMs
# STILL_WEDGE = 6 # NOTE: Unused Step scan
# STILLS = 7 # NOTE: Unused Just send triggers to detector
# REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused
# SINGLE_OSCILLATION = 9
# OLD_FASHIONED = 10 # NOTE: Unused
# RASTER_SCAN = 11
# JET_ROTATION = 12 # NOTE: Unused
# X_HELICAL = 13 # NOTE: Unused
# X_RUNSEQ = 14 # NOTE: Unused
# JUNGFRAU = 15
# MSOX = 16 # NOTE: Unused
# SLIT_SCAN = 17 # NOTE: Unused
# RASTER_SCAN_STILL = 18
# SCAN_SASTT = 19
# SCAN_SASTT_V2 = 20
# SCAN_SASTT_V3 = 21
class AerotechFlyscanBase(AsyncFlyScanBase):
"""Base class for MX flyscans
Low-level base class for standard scans at the PX beamlines at SLS. Theese
scans use the A3200 rotation stage and the actual experiment is performed
using an AeroBasic script controlled via global variables. The base class
has some basic safety features like checking status then sets globals and
fires off the scan. Implementations can choose to set the corresponding
configurations in child classes or pass it as command line parameters.
IMPORTANT: The AeroBasic scripts take care of the PSO configuration.
Parameters:
-----------
abr_complete : bool
Wait for the launched ABR task to complete.
"""
scan_type = "fly"
scan_report_hint = "table"
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
# Aerotech stage config
abr_raster_reset = False
abr_complete = False
abr_timeout = None
pointID = 0
num_pos = 0
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
parameters.
"""
super().__init__(parameter=parameter, **kwargs)
if "abr_raster_reset" in self.caller_kwargs:
self.abr_raster_reset = self.caller_kwargs.get("abr_raster_reset")
if "abr_complete" in self.caller_kwargs:
self.abr_complete = self.caller_kwargs.get("abr_complete")
if "abr_timeout" in self.caller_kwargs:
self.abr_timeout = self.caller_kwargs.get("abr_timeout")
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# TODO: Move roughly to start position???
# ABR status checking
stat = yield from self.stubs.send_rpc_and_wait("abr", "status.get")
if stat not in (0, "OK"):
raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset")
task = yield from self.stubs.send_rpc_and_wait("abr", "task1.get")
# From what I got values are copied to local vars at the start of scan,
# so only kickoff should be forbidden.
if task not in (1, "OK"):
raise RuntimeError("Aerotech ABR task #1 seems to busy")
# Reset the raster scan engine
if self.abr_raster_reset:
yield from self.stubs.send_rpc_and_wait("abr", "raster_scan_done.set", 0)
# Call super
yield from super().pre_scan()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff")
logger.info("Measurement launched on the ABR stage...")
# Wait for grid scanner to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
return super().cleanup()
class MeasureStandardWedge(AerotechFlyscanBase):
"""Standard wedge scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : (float, float)
Scan start position of the axis.
range : (float, float)
Scan range of the axis.
move_time : (float)
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
"""
scan_name = "standardscan"
required_kwargs = ["start", "range", "move_time"]
class MeasureVerticalLine(AerotechFlyscanBase):
"""Vertical line scan using the GMY motor
Simple relative continous line scan that records a single vertical line
with PSO output. There's no actual stepping, it's only used for velocity
calculation.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_vline(range_y=12, steps_y=40, exp_time=0.1)
Parameters
----------
range : float
Step size [mm].
steps : int
Scan range of the axis.
exp_time : float
Eeffective exposure time per step [s]
"""
scan_name = "vlinescan"
required_kwargs = ["exp_time", "range", "steps"]
class MeasureRasterSimple(AerotechFlyscanBase):
"""Simple raster scan
Measure a simplified relative zigzag raster scan in the X-Y plane.
The scan is relative assumes the goniometer is currently at the CENTER of
the first cell (i.e. TOP-LEFT). Each line is executed as a single continous
movement, i.e. there's no actual stepping in the X direction.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.raster_simple(exp_time=0.1, range_x=4, range_y=4, steps_x=80, steps_y=80)
Parameters
----------
exp_time : float
Effective exposure time for each cell along the X axis [s].
range_x : float
Scan step size [mm].
range_y : float
Scan step size [mm].
steps_x : int
Number of scan steps in X (fast). Only used for velocity calculation.
steps_y : int
Number of scan steps in Y (slow).
"""
scan_name = "rasterscan"
required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"]
class MeasureScreening(AerotechFlyscanBase):
"""Sample screening scan
Sample screening scan that scans intervals on the rotation axis taking
1 image/interval. This makes sure that we hit diffraction peaks if there
are any crystals.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_screening(start=42, range=180, steps=18, exp_time=0.1, oscrange=2.0)
Parameters
----------
start : float
Absolute scan start position of the omega axis [deg].
range : float
Total screened range of the omega axis relative to 'start' [deg].
steps : int
Number of blurred intervals.
exp_time : float
Exposure time per blurred interval [s].
oscrange : float
Motion blurring of each interval [deg]
delta : float
Safety margin for sub-range movements (default: 0.5) [deg].
"""
scan_name = "screeningscan"
required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"]
class MeasureHelical(AerotechFlyscanBase):
"""Helical scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : float
Scan start position of the axis.
range : float
Scan range of the axis.
move_time : float
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
sg_start : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_end : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_steps : int
Number of steps with SmarGon.
"""
scan_name = "helicalscan"
required_kwargs = ["start", "range", "move_time", "sg_start", "sg_end", "sg_steps"]
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# Smargon has no velocity control
self.smargon_start = np.array(self.caller_kwargs.get("sg_start"))
self.smargon_end = np.array(self.caller_kwargs.get("sg_end"))
self.smargon_steps = self.caller_kwargs.get("sg_steps")
self.smargon_range = self.smargon_end - self.smargon_start
self.smargon_step_size = self.smargon_range / self.smargon_steps
self.smargon_step_time = self.caller_kwargs.get("move_time") / self.smargon_steps
logger.info(f"Start:\t{self.smargon_start}")
logger.info(f"End:\t{self.smargon_end}")
logger.info(f"Steps:\t{self.smargon_steps}")
logger.info(f"Range:\t{self.smargon_range}")
logger.info(f"StepSize:\t{self.smargon_step_size}")
logger.info(f"StepTime:\t{self.smargon_step_time}")
# TODO: Move roughly to start position???
st0 = yield from self.stubs.send_rpc("shx", "omove", self.smargon_start[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", self.smargon_start[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", self.smargon_start[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", self.smargon_start[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", self.smargon_start[4])
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
# Call super
yield from super().pre_scan()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "kickoff")
logger.info("Measurement launched on the ABR stage...")
logger.info("Performing SmarGon stepping...")
for ss in range(self.smargon_steps):
sg_pos = self.smargon_start + ss * self.smargon_step_size
# Move to position but don't care
st0 = yield from self.stubs.send_rpc("shx", "omove", sg_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", sg_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", sg_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", sg_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", sg_pos[4])
t_start = time.time()
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
t_end = time.time()
t_elapsed = t_end - t_start
time.sleep(max(self.smargon_step_time - t_elapsed, 0))
# Wait for scan task to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()
class MeasureHelical2(AerotechFlyscanBase):
"""Helical scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : float
Scan start position of the axis.
range : float
Scan range of the axis.
move_time : float
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
sg_start : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_end : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_steps : int
Number of steps with SmarGon.
"""
scan_name = "helicalscan2"
required_kwargs = ["start", "range", "move_time", "sg_start", "sg_end", "sg_steps"]
point_id = 0
# def __init__(self, *args, parameter: dict = None, **kwargs):
# """Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
# parameters.
# """
# self.num_pos = kwargs["sg_steps"]
# super().__init__(*args, parameter=parameter, **kwargs)
def prepare_positions(self):
# Smargon has no velocity control
self.smargon_start = np.array(self.caller_kwargs.get("sg_start"))
self.smargon_end = np.array(self.caller_kwargs.get("sg_end"))
self.smargon_steps = self.caller_kwargs.get("sg_steps")
self.smargon_range = self.smargon_end - self.smargon_start
self.smargon_step_size = self.smargon_range / self.smargon_steps
self.smargon_step_time = self.caller_kwargs.get("move_time") / self.smargon_steps
logger.info(f"Start:\t{self.smargon_start}")
logger.info(f"End:\t{self.smargon_end}")
logger.info(f"Steps:\t{self.smargon_steps}")
logger.info(f"Range:\t{self.smargon_range}")
logger.info(f"StepSize:\t{self.smargon_step_size}")
logger.info(f"StepTime:\t{self.smargon_step_time}")
self.num_pos = self.smargon_steps
self.positions = np.linspace(self.smargon_start, self.smargon_end, self.smargon_steps)
self.start_pos = self.positions[0, :]
# Call super
yield from super().prepare_positions()
# def update_scan_motors(self):
# """ Update step scan motors"""
# self.scan_motors = ['shx', 'shy', 'shz', 'chi', 'phi']
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# Move roughly to start position
st0 = yield from self.stubs.send_rpc("shx", "omove", self.start_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", self.start_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", self.start_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", self.start_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", self.start_pos[4])
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
# print(f"\n\n{self.readout_priority}\n\n")
# Call super
yield from super().pre_scan()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "kickoff")
logger.info("Measurement launched on the ABR stage...")
logger.info("Performing SmarGon stepping...")
for _, sg_pos in enumerate(self.positions):
# sg_pos = self.smargon_start + ss * self.smargon_step_size
# Move to position but don't care
st0 = yield from self.stubs.send_rpc("shx", "omove", sg_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", sg_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", sg_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", sg_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", sg_pos[4])
t_start = time.time()
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
t_end = time.time()
t_elapsed = t_end - t_start
time.sleep(max(self.smargon_step_time - t_elapsed, 0))
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
# Wait for scan task to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()

View File

@@ -0,0 +1,69 @@
# pylint: disable=undefined-variable
# import bec
# import bec_lib.devicemanager.DeviceContainer as dev
def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visual=True, **kwargs):
"""Demo step scan with plotting
This is a simple user-space demo step scan with the BEC. It be a
standard BEC scan, while still setting up the environment.
Example:
--------
ascan(dev.dccm_energy, 12,13, steps=21, exp_time=0.1, datasource=dev.dccm_xbpm)
"""
# Dummy method to check beamline status
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
motor = dev.dccm_theta2
if scan_start is None:
scan_start = -0.05 / dev.dccm_energy.user_readback.get()
if scan_end is None:
scan_end = 0.05 / dev.dccm_energy.user_readback.get()
if visual:
# Get or create scan specific window
window = None
for _, val in bec.gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = bec.gui.new("CurrentScan")
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=datasource)
plt1.set_x_label(motor)
plt1.set_y_label(datasource)
plt1.add_dap(motor, datasource, dap="LinearModel")
window.show()
print("Handing over to 'scans.line_scan'")
s = scans.line_scan(
motor,
scan_start,
scan_end,
steps=steps,
exp_time=exp_time,
datasource=datasource,
relative=True,
**kwargs,
)
if visual:
# If fitting via GUI
firt_par = plt1.get_dap_params()
else:
# Without GUI
firt_par = bec.dap.LinearModel.fit(
s, motor.name, motor.name, datasource.name, datasource.name
)
# TODO: Validate fitted position
# TODO: Move to fitted maximum
return s, firt_par

View File

@@ -0,0 +1,80 @@
# import bec
# import bec_lib.devicemanager.DeviceContainer as dev
def bl_check_beam():
"""Check beamline status before scan"""
return True
def ascan(
motor,
scan_start,
scan_end,
steps,
exp_time,
plot=None,
visual=True,
relative=False,
**kwargs,
):
"""Demo step scan with plotting
This is a simple user-space demo step scan with the BEC. It be a
standard BEC scan, while still setting up the environment.
Example:
--------
ascan(dev.dccm_energy, 12,13, steps=21, exp_time=0.1, datasource=dev.dccm_xbpm)
"""
# Dummy method to check beamline status
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
if visual:
# Get or create scan specific window
window = None
for _, val in bec.gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = bec.gui.new("CurrentScan")
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=plot)
plt1.set_x_label(motor)
plt1.set_y_label(plot)
plt1.add_dap(motor, plot, dap="LinearModel")
window.show()
print("Handing over to 'scans.line_scan'")
s = scans.line_scan(
motor,
scan_start,
scan_end,
steps=steps,
exp_time=exp_time,
plot=plot,
relative=relative,
**kwargs,
)
if visual:
# Fitting via GUI
firt_par = plt1.get_dap_params()
else:
# Fitting without GUI
firt_par = bec.dap.LinearModel.fit(
s, motor.name, motor.name, plot.name, plot.name
)
# # Some basic fit
# dkey = datasource.full_name
# NOTE: s.scan.data == bec.history[-1]
# datapoints = bec.history[-1].devices[dkey].read()[dkey]['value']
# positions
return s, firt_par

82
pyproject.toml Normal file
View File

@@ -0,0 +1,82 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "pxiii_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = [
"bec_ipython_client",
"bec_lib",
"bec_server",
"bec_widgets",
"ophyd_devices",
"std_daq_client",
"rich",
"pyepics",
"pandas~=2.0",
"matplotlib",
"zmq",
]
[project.optional-dependencies]
dev = [
"black",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"pytest-redis",
]
[project.entry-points."bec"]
plugin_bec = "pxiii_bec"
[project.entry-points."bec.deployment.device_server"]
plugin_ds_startup = "pxiii_bec.deployment.device_server.startup:run"
[project.entry-points."bec.file_writer"]
plugin_file_writer = "pxiii_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "pxiii_bec.scans"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "pxiii_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "pxiii_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "pxiii_bec.bec_widgets.auto_updates:PlotUpdate"
[tool.hatch.build.targets.wheel]
include = ["*"]
[tool.isort]
profile = "black"
line_length = 100
multi_line_output = 3
include_trailing_comma = true
[tool.black]
line-length = 100
skip-magic-trailing-comma = true
[tool.pylint.basic]
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs = [
".*scanID.*",
".*RID.*",
".*pointID.*",
".*ID.*",
".*_2D.*",
".*_1D.*",
]

View File

@@ -1,21 +0,0 @@
[metadata]
name = bec_plugins
description = BEC plugins to modify the behaviour of services within the BEC framework
long_description = file: README.md
long_description_content_type = text/markdown
url = https://gitlab.psi.ch/bec/bec
project_urls =
Bug Tracker = https://gitlab.psi.ch/bec/bec/issues
classifiers =
Programming Language :: Python :: 3
Development Status :: 3 - Alpha
Topic :: Scientific/Engineering
[options]
package_dir =
= .
packages = find:
python_requires = >=3.8
[options.packages.find]
where = .

View File

@@ -1,7 +0,0 @@
from setuptools import setup
if __name__ == "__main__":
setup(
install_requires=[],
extras_require={"dev": ["pytest", "pytest-random-order", "coverage"]},
)

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).