75 Commits

Author SHA1 Message Date
Unknown MX Person
a547225af2 BEC AutoUpdate class is not executed, but fixed A3200 status monitoring 2025-06-16 13:44:46 +02:00
Unknown MX Person
44e960062a Added collimator and 2D auto updates 2025-06-16 11:52:02 +02:00
4bb274d3c8 Update copier template source to github 2025-06-11 16:18:03 +02:00
Unknown MX Person
3e340334cf fix(config): reenable some devices 2025-05-22 13:19:49 +02:00
Unknown MX Person
abaf0867cd fix(scan_history): chagned hardcoded simulated devices from PXII to real PXIII devices 2025-05-22 13:19:33 +02:00
Unknown MX Person
5ceebaa8f6 fix(auto_update): autoupdate script moved to autoupdate directory 2025-05-22 13:18:08 +02:00
gac-x06da
fc4e33ac93 WIP 2025-05-21 11:01:57 +02:00
gac-x06da
e77f9af9ca At least no exceptions 2025-05-21 11:01:57 +02:00
Unknown MX Person
e38269b96f ABR with latest BEC 2025-05-21 11:01:57 +02:00
Unknown MX Person
3940cb2da1 auto formated 2025-05-21 11:01:57 +02:00
Unknown MX Person
c79b919ed6 auto formated 2025-05-21 11:01:57 +02:00
Unknown MX Person
43c84237a9 changed config file 2025-05-21 11:01:57 +02:00
gac-x06da
588316262c WIP 2025-05-21 11:01:57 +02:00
gac-x06da
2cad486156 WIP 2025-05-21 11:01:57 +02:00
gac-x06da
689089fbbb Blacking 2025-05-21 11:01:57 +02:00
gac-x06da
6b80009bec Keyword scans for 2D 2025-05-21 11:01:57 +02:00
gac-x06da
0bded12b4e Added AttributeError to updates 2025-05-21 11:01:57 +02:00
6fedf7091f chore: remove out-of-date deployment dir 2025-05-20 09:36:15 +02:00
54fda2508b chore: migrate to copier template v1.0.0 2025-05-20 09:35:46 +02:00
ci_update_bot
19b2299840 docs: Update device list 2025-03-11 12:29:00 +00:00
gac-x06da
d18c099058 WIP 2025-03-11 13:24:06 +01:00
gac-x06da
5ca9972383 WIP 2025-03-11 13:24:06 +01:00
gac-x06da
3b9a86c8c8 WIP 2025-03-11 13:24:06 +01:00
gac-x06da
e64b5b2c3d WIP 2025-03-11 13:24:06 +01:00
gac-x06da
284914dc53 Keyword auto updates for alignment scans 2025-03-11 13:24:06 +01:00
gac-x06da
d8a178ae13 Bump 2025-03-11 13:24:06 +01:00
gac-x06da
e79a3f785a Helical scan with proggress bar 2025-03-11 13:24:06 +01:00
gac-x06da
963b775200 Fitting and pneumatic valve 2025-03-11 13:24:06 +01:00
gac-x06da
9ea249fff7 Fix from Klaus 2025-03-11 13:24:06 +01:00
gac-x06da
2930673bbc Two SmarGon axis versions 2025-03-11 13:24:06 +01:00
gac-x06da
6a45a6a357 GUI commands run individually but script runs into timeout 2025-03-11 13:24:06 +01:00
gac-x06da
a794e3f60d Workaround on motor schema expected by BEC to move 2025-03-11 13:24:06 +01:00
gac-x06da
6b4a175f78 Another SmarGon client approach 2025-03-11 13:24:06 +01:00
gac-x06da
c8a1add697 More stable SmarGon movement 2025-03-11 13:24:06 +01:00
gac-x06da
a5642b5db2 Bump to create branch 2025-03-11 13:24:06 +01:00
ci_update_bot
d1c2dbb46b docs: Update device list 2025-01-30 11:29:00 +00:00
93d79eccd4 Update pyproject.toml 2025-01-30 12:24:31 +01:00
gac-x06da
1e81aa34b9 Plugins were not meant to run standalone 2025-01-30 12:15:15 +01:00
gac-x06da
c5b97bd592 Plugins were not meant to run standalone 2025-01-30 12:13:37 +01:00
gac-x06da
42d518c2e4 Plugins were not meant to run standalone 2025-01-30 12:09:12 +01:00
gac-x06da
9a40cbd8ae Enabling AD plugin to crash 2025-01-30 11:11:15 +01:00
gac-x06da
59bd4aeb9a First helical scan passed 2025-01-30 10:43:38 +01:00
gac-x06da
a455a490c6 Flaking 2025-01-29 13:13:21 +01:00
gac-x06da
22c46f8f8e Mono scan works 2025-01-28 15:45:10 +01:00
gac-x06da
4b76d1b191 omove works without crash 2025-01-27 18:27:09 +01:00
gac-x06da
4fc31e5f5d Samcam image preview and smargon waiting 2025-01-27 17:37:53 +01:00
gac-x06da
286c7a4bff Array preview also works 2025-01-27 15:13:16 +01:00
gac-x06da
7ab682817f Working samcam stream preview with StdDaq client 2025-01-27 12:25:27 +01:00
gac-x06da
ac3d82f7ef Merge branch 'main' into feature/device-aerotech-a3200 2025-01-24 16:21:49 +01:00
gac-x06da
023e0aab2e Fixing SmarGon axes 2025-01-24 16:17:04 +01:00
gac-x06da
21bd57393f Blacking and cleanup of unused code 2025-01-24 15:42:46 +01:00
gac-x06da
82d51649ee Blacking and cleanup of unused code 2025-01-24 15:21:59 +01:00
gac-x06da
015bf2ee3b Blacking 2025-01-24 15:13:24 +01:00
gac-x06da
24302c244d Fix for the new scan type validation 2025-01-24 15:06:45 +01:00
gac-x06da
a8990f8de2 axis client getting ready 2025-01-22 17:42:50 +01:00
gac-x06da
8da2ed4102 BEC style A3200 seems working 2025-01-20 18:41:04 +01:00
gac-x06da
03a5850bbf SmarGon reads 2025-01-15 17:44:46 +01:00
gac-x06da
d3d016108e First tries with ABR 2025-01-15 14:46:51 +01:00
gac-x06da
0b99a82ae9 Waiting for raster scan works 2024-12-17 17:41:47 +01:00
Unknown MX Person
add46d8b0d A3200 cleanup 2024-12-13 17:44:52 +01:00
Unknown MX Person
78c75b1769 Moving towards beamline startup 2024-11-13 14:30:57 +01:00
Unknown MX Person
b0703552f2 Aerotech scans work nicely 2024-09-20 14:38:04 +02:00
Unknown MX Person
14ca9bd74a Daily commit mit some patching 2024-09-19 17:30:13 +02:00
Unknown MX Person
2563471ac8 Dailiy commit 2024-09-18 18:26:54 +02:00
Unknown MX Person
23aadabfd1 First Aerotech scan works 2024-09-18 12:49:33 +02:00
Unknown MX Person
3bf21ff647 Both ABR stage and scan instantiates, need Zac to test it safely 2024-09-09 13:23:03 +02:00
Unknown MX Person
602317faa8 A3200 starts to get ready for scanning 2024-08-28 13:23:06 +02:00
Unknown MX Person
13c6d7b8fb Starting to move things to scans 2024-08-27 17:58:33 +02:00
e7fd8e453d Still a lot to do 2024-07-16 14:32:09 +02:00
4a6e4092ca Merge branch 'main' into feature/device-aerotech-a3200 2024-07-09 11:15:13 +02:00
65800812a5 ECMC virtal energy motors 2024-06-26 10:53:54 +02:00
2ee2b25c21 Cleaned up imports and PVs 2024-06-21 12:13:57 +02:00
1a204693dc Added rocking 2024-06-13 13:34:02 +02:00
20dff942c1 Basic PVPositioner works 2024-06-13 12:41:32 +02:00
eec897f713 Device draft from office 2024-06-13 12:07:03 +02:00
61 changed files with 8403 additions and 514 deletions

9
.copier-answers.yml Normal file
View File

@@ -0,0 +1,9 @@
# Do not edit this file!
# It is needed to track the repo template version, and editing may break things.
# This file will be overwritten by copier on template updates.
_commit: v1.0.0
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: pxiii_bec
widget_plugins_input: null

3
.git_hooks/post-commit Normal file
View File

@@ -0,0 +1,3 @@
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
semantic-release changelog -D version_variable=$SCRIPT_DIR/../../semantic_release/__init__.py:__version__
semantic-release version -D version_variable=$SCRIPT_DIR/../../semantic_release/__init__.py:__version__

3
.git_hooks/pre-commit Normal file
View File

@@ -0,0 +1,3 @@
black --line-length=100 $(git diff --cached --name-only --diff-filter=ACM -- '*.py')
isort --line-length=100 --profile=black --multi-line=3 --trailing-comma $(git diff --cached --name-only --diff-filter=ACM -- '*.py')
git add $(git diff --cached --name-only --diff-filter=ACM -- '*.py')

3
.gitignore vendored
View File

@@ -8,6 +8,9 @@
**/.pytest_cache
**/*.egg*
# recovery_config files
recovery_config_*
# file writer data
**.h5

View File

@@ -1,7 +1,7 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "pxiii"
target: "pxiii_bec"
branch: $CHILD_PIPELINE_BRANCH
- file: /templates/plugin-repo-template.yml
inputs:
name: pxiii_bec
target: pxiii_bec
branch: $CHILD_PIPELINE_BRANCH
project: bec/awi_utils

View File

@@ -1,6 +1,7 @@
BSD 3-Clause License
Copyright (c) 2024, Paul Scherrer Institute
Copyright (c) 2025, Paul Scherrer Institute
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
@@ -25,4 +26,4 @@ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

1
bin/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
# Add anything you don't want to check in to git, e.g. very large files

View File

@@ -1,11 +0,0 @@
# This file is used to select the BEC and Ophyd Devices version for the auto deployment process.
# Do not edit this file unless you know what you are doing!
# The version can be a git tag, branch or commit hash.
# BEC version to use
BEC_AUTODEPLOY_VERSION="master"
# ophyd_devices version to use
OPHYD_DEVICES_AUTODEPLOY_VERSION="master"

View File

@@ -1,18 +0,0 @@
redis:
host: localhost
port: 6379
mongodb:
host: localhost
port: 27017
scibec:
host: http://[::1]
port: 3030
beamline: "PXIII"
service_config:
general:
reset_queue_on_cancel: True
enforce_ACLs: False
file_writer:
plugin: default_NeXus_format
base_path: ./

View File

@@ -1,29 +0,0 @@
# deployment script to be translated to Ansible
# can be removed once we have the autodeployment in place
BEAMLINE_REPO=gitlab.psi.ch:bec/pxiii-bec.git
git clone git@$BEAMLINE_REPO
module add psi-python311/2024.02
# start redis
docker run --network=host --name redis-bec -d redis
# alternative:
# conda install -y redis; redis-server &
# get the target versions for ophyd_devices and BEC
source ./pxiii-bec/deployment/autodeploy_versions
git clone -b $OPHYD_DEVICES_AUTODEPLOY_VERSION https://gitlab.psi.ch/bec/ophyd_devices.git
git clone -b $BEC_AUTODEPLOY_VERSION https://gitlab.psi.ch/bec/bec.git
# install BEC
cd bec
source ./bin/install_bec_dev.sh
cd ../
pip install -e ./pxiii-bec
# start the BEC server
bec-server start --config ./pxiii-bec/deployment/bec-server-config.yaml

View File

@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>1801</width>
<height>1459</height>
</rect>
</property>
<property name="minimumSize">
<size>
<width>1801</width>
<height>1459</height>
</size>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QTabWidget" name="tabWidget">
<property name="currentIndex">
<number>0</number>
</property>
<widget class="QWidget" name="tab">
<attribute name="title">
<string>Control Panel</string>
</attribute>
<layout class="QGridLayout" name="gridLayout" rowstretch="3,4" columnstretch="2,5">
<item row="0" column="0">
<widget class="Waveform" name="waveform"/>
</item>
<item row="0" column="1">
<widget class="ScanControl" name="scan_control"/>
</item>
<item row="1" column="0">
<widget class="ScanHistory" name="scan_history"/>
</item>
<item row="1" column="1">
<widget class="BECQueue" name="bec_queue"/>
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_2">
<attribute name="title">
<string>Logbook</string>
</attribute>
<layout class="QGridLayout" name="gridLayout_3">
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="font">
<font>
<pointsize>24</pointsize>
</font>
</property>
<property name="text">
<string>Coming soon...</string>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_3">
<attribute name="title">
<string>Take a break</string>
</attribute>
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="0">
<widget class="Minesweeper" name="minesweeper"/>
</item>
<item row="0" column="1">
<spacer name="horizontalSpacer">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>1073</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item row="1" column="0">
<spacer name="verticalSpacer">
<property name="orientation">
<enum>Qt::Orientation::Vertical</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>20</width>
<height>40</height>
</size>
</property>
</spacer>
</item>
</layout>
</widget>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>ScanControl</class>
<extends>QWidget</extends>
<header>scan_control</header>
</customwidget>
<customwidget>
<class>Waveform</class>
<extends>QWidget</extends>
<header>waveform</header>
</customwidget>
<customwidget>
<class>BECQueue</class>
<extends>QWidget</extends>
<header>bec_queue</header>
</customwidget>
<customwidget>
<class>Minesweeper</class>
<extends>QWidget</extends>
<header>minesweeper</header>
</customwidget>
<customwidget>
<class>ScanHistory</class>
<extends>QWidget</extends>
<header>scan_history</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1 +1 @@
from .auto_updates import PlotUpdate

View File

@@ -1,26 +0,0 @@
from bec_widgets.cli.auto_updates import AutoUpdates, ScanInfo
class PlotUpdate(AutoUpdates):
# def simple_line_scan(self, info: ScanInfo) -> None:
# """
# Simple line scan.
# """
# dev_x = info.scan_report_devices[0]
# dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
# if not dev_y:
# return
# self.figure.clear_all()
# plt = self.figure.plot(dev_x, dev_y)
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def handler(self, info: ScanInfo) -> None:
# EXAMPLES:
# if info.scan_name == "line_scan" and info.scan_report_devices:
# self.simple_line_scan(info)
# return
# if info.scan_name == "grid_scan" and info.scan_report_devices:
# self.run_grid_scan_update(info)
# return
super().handler(info)

View File

@@ -0,0 +1 @@
from .auto_updates import AutoUpdates

View File

@@ -0,0 +1,90 @@
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_lib.messages import ScanStatusMessage
from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError
class PlotUpdate(AutoUpdates):
#######################################################################
################# GUI Callbacks #######################################
#######################################################################
def on_start(self) -> None:
"""
Procedure to run when the auto updates are enabled.
"""
self.start_default_dock()
def on_stop(self) -> None:
"""
Procedure to run when the auto updates are disabled.
"""
def on_scan_open(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan starts.
Args:
msg (ScanStatusMessage): The scan status message.
"""
# if "kwargs" in msg.request_inputs:
# dev_plt = msg.request_inputs["kwargs"].get("plot", None)
# if dev_plt is not None:
# # Handle depending on scan dimension
# if len(msg.scan_report_devices) == 1:
# dev_x = msg.scan_report_devices[0]
# # Set the dock to the waveform widget
# wf = self.set_dock_to_widget("Waveform")
# # Clear the waveform widget and plot the data
# wf.clear_all()
# wf.plot(
# x_name=dev_x,
# y_name=dev_plt,
# label=f"Scan {msg.info.scan_number} - {dev_plt}",
# title=f"Scan {msg.info.scan_number}",
# x_label=dev_x,
# y_label=dev_plt,
# )
# if len(msg.scan_report_devices) == 2:
# dev_x = msg.scan_report_devices[0]
# dev_y = msg.scan_report_devices[1]
# # Set the dock to the waveform widget
# wf = self.set_dock_to_widget("Waveform")
# # Clear the waveform widget and plot the data
# wf.clear_all()
# wf.plot(
# x_name=dev_x,
# y_name=dev_y,
# z_name=dev_plt,
# label=f"Scan {msg.info.scan_number} - {dev_plt}",
# title=f"Scan {msg.info.scan_number} - {dev_plt}",
# x_label=dev_x,
# y_label=dev_y,
# z_label=dev_plt,
# )
# elif msg.scan_name == "line_scan" and msg.scan_report_devices:
# return self.simple_line_scan(msg)
# elif msg.scan_name == "grid_scan" and msg.scan_report_devices:
# return self.simple_grid_scan(msg)
# elif msg.scan_report_devices:
# return self.best_effort(msg)
return None
def on_scan_closed(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan ends.
Args:
msg (ScanStatusMessage): The scan status message.
"""
def on_scan_abort(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan is aborted.
Args:
msg (ScanStatusMessage): The scan status message.
"""

View File

@@ -0,0 +1,40 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
logger = bec_logger.logger
# pylint: skip-file
_Widgets = {
"ScanHistory": "ScanHistory",
}
class ScanHistory(RPCBase):
@rpc_call
def select_scan_from_history(self, value: "int") -> "None":
"""
Set scan from CLI.
Args:
value (int) : value from history -1 ...-10000
"""
@rpc_call
def add_scan_from_history(self) -> "None":
"""
Load selected scan from history.
"""
@rpc_call
def clear_plot(self) -> "None":
"""
Delete all curves on the plot.
"""

View File

@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from pxiii_bec.bec_widgets.widgets.scan_history.scan_history_plugin import ScanHistoryPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ScanHistoryPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,191 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING, TypedDict
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QVBoxLayout, QWidget
logger = bec_logger.logger
if TYPE_CHECKING:
from qtpy.QtWidgets import QPushButton, QLabel, QSpinBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.editors.text_box.text_box import TextBox
class ScanHistoryUIComponents(TypedDict):
waveform: Waveform
metadata_text_box: TextBox
monitor_label: QLabel
monitor_combobox: DeviceComboBox
history_label: QLabel
history_spin_box: QSpinBox
history_add: QPushButton
history_clear: QPushButton
class ScanHistory(BECWidget, QWidget):
USER_ACCESS = ["select_scan_from_history", "add_scan_from_history", "clear_plot"]
PLUGIN = True
ui_file = "./scan_history.ui"
components: ScanHistoryUIComponents
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
self.components: ScanHistoryUIComponents = {
"waveform" : self.ui.waveform,
"metadata_text_box" : self.ui.metadata_text_box,
"monitor_label" : self.ui.monitor_label,
"monitor_combobox" : self.ui.monitor_combobox,
"history_label" : self.ui.history_label,
"history_spin_box" : self.ui.history_spin_box,
"history_add" : self.ui.history_add,
"history_clear" : self.ui.history_clear,
}
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.components['monitor_combobox'].apply_filter = False
self.components['monitor_combobox'].devices = ['dccm_diode_bottom', 'dccm_diode_top', 'dccm_xbpm', 'ssxbpm', 'xbox_diode']
self.components['history_spin_box'].setMinimum(-10000)
self.components['history_spin_box'].setMaximum(-1)
self.components['history_spin_box'].valueChanged.connect(self._scan_history_selected)
self._scan_history_selected(-1)
self.components['history_spin_box'].setValue(-1)
self.components['history_add'].setText("Load")
self.components['history_add'].setStyleSheet(
"background-color: #129490; color: white; font-weight: bold; font-size: 12px;"
)
self.components['history_clear'].setText("Clear")
self.components['history_clear'].setStyleSheet(
"background-color: #065143; color: white; font-weight: bold; font-size: 12px;"
)
self.components['history_add'].clicked.connect(self._refresh_plot)
self.components['history_clear'].clicked.connect(self.clear_plot)
self.setWindowTitle("Scan History")
self._scan_history_selected(-1)
@SafeSlot()
def add_scan_from_history(self) -> None:
"""Load selected scan from history."""
self.components['history_add'].click()
@SafeSlot()
def clear_plot(self) -> None:
"""Delete all curves on the plot."""
self.components['waveform'].clear_all()
@SafeSlot()
def _refresh_plot(self) -> None:
"""Refresh plot."""
spin_box_value = self.components['history_spin_box'].value()
self._check_scan_in_history(spin_box_value)
# Get the data from the client
data = self.client.history[spin_box_value]
# Check that the plot does not already have a curve with the same data
scan_number = int(data.metadata.bec['scan_number'])
monitor_name = self.components['monitor_combobox'].currentText()
# Get signal hints
signal_name = getattr(self.client.device_manager.devices, monitor_name)._hints
signal_name = signal_name[0] if len(signal_name)>0 else signal_name
curve_label = f"Scan-{scan_number}-{monitor_name}-{signal_name}"
if len([curve for curve in self.components['waveform'].curves if curve.config.label == curve_label]):
return
if not hasattr(data.devices, monitor_name):
raise ValueError(f"Device {monitor_name} not found in data.")
# Get scan motors and check that the plot x_axis motor is the same as the scan motor, if not, clear the plot
scan_motors = [motor.decode() for motor in data.metadata.bec['scan_motors']]
x_motor_name = self.components['waveform'].x_mode
if x_motor_name not in scan_motors:
self.clear_plot()
self.components['waveform'].x_mode = x_motor_name = scan_motors[0]
# fetching the data
monitor_data = getattr(data.devices, monitor_name).read()[signal_name]['value']
motor_data = getattr(data.devices, x_motor_name).read()[x_motor_name]['value']
# Plot custom curve, with custom label
self.components['waveform'].plot(x=motor_data, y=monitor_data, label=curve_label)
x_label = f"{x_motor_name} / [{getattr(self.client.device_manager.devices, x_motor_name).egu()}]"
self.components['waveform'].x_label = x_label
def _check_scan_in_history(self, history_value:int) -> None:
"""
Check if scan is in history.
Args:
history_value (int): Value from history -1...-10000
"""
if len(self.client.history) < abs(history_value):
self.components['metadata_text_box'].set_plain_text(f"Scan history does not have the request scan {history_value} of history with length: {len(self.client.history)}")
return
def select_scan_from_history(self, value:int) -> None:
"""
Set scan from CLI.
Args:
value (int) : value from history -1 ...-10000
"""
if value >=0:
raise ValueError(f"Value must be smaller or equal -1, provided {value}")
self.components['history_spin_box'].setValue(value)
@SafeSlot(int)
def _scan_history_selected(self, spin_box_value:int) -> None:
self._check_scan_in_history(spin_box_value)
data = self.client.history[spin_box_value]
data.metadata.bec['scan_motors'][0].decode()
text = str(data)
scan_motor_text = "\n" + "Scan Motors: "
for motor in data.metadata.bec['scan_motors']:
scan_motor_text += f" {motor.decode()}"
self.components['metadata_text_box'].set_plain_text(text + scan_motor_text)
@SafeSlot(str)
def _set_x_axis(self, device_x:str) -> None:
self.components['waveform'].x_mode = device_x
@SafeSlot(str)
def _plot_new_device(self, device:str) -> None:
# if len(curve for curve in self.components["waveform"].curves if curve.config.label == f"{device}-{device}":
self.components["waveform"].plot(device)
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = ScanHistory()
widget.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@
{'files': ['scan_history.py']}

View File

@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>955</width>
<height>796</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout_2" stretch="9,3">
<item>
<widget class="Waveform" name="waveform">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>0</width>
<height>0</height>
</size>
</property>
</widget>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout_2">
<item>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QLabel" name="monitor_label">
<property name="font">
<font/>
</property>
<property name="text">
<string>BPM Monitor</string>
</property>
</widget>
</item>
<item>
<widget class="DeviceComboBox" name="monitor_combobox"/>
</item>
<item>
<widget class="QLabel" name="history_label">
<property name="text">
<string>Scan History</string>
</property>
</widget>
</item>
<item>
<widget class="QSpinBox" name="history_spin_box"/>
</item>
<item>
<widget class="QPushButton" name="history_add">
<property name="text">
<string>Add scan</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="history_clear">
<property name="text">
<string>clear all</string>
</property>
</widget>
</item>
</layout>
</item>
<item>
<widget class="TextBox" name="metadata_text_box">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="MinimumExpanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>795</width>
<height>191</height>
</size>
</property>
</widget>
</item>
</layout>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>TextBox</class>
<extends>QWidget</extends>
<header>text_box</header>
</customwidget>
<customwidget>
<class>DeviceComboBox</class>
<extends>QComboBox</extends>
<header>device_combobox</header>
</customwidget>
<customwidget>
<class>Waveform</class>
<extends>QWidget</extends>
<header>waveform</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -0,0 +1,54 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from pxiii_bec.bec_widgets.widgets.scan_history.scan_history import ScanHistory
DOM_XML = """
<ui language='c++'>
<widget class='ScanHistory' name='scan_history'>
</widget>
</ui>
"""
class ScanHistoryPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = ScanHistory(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(ScanHistory.ICON_NAME)
def includeFile(self):
return "scan_history"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "ScanHistory"
def toolTip(self):
return "ScanHistory"
def whatsThis(self):
return self.toolTip()

2107
pxiii_bec/config_saved.yaml Normal file

File diff suppressed because it is too large Load Diff

2094
pxiii_bec/config_saved.yaml~ Normal file

File diff suppressed because it is too large Load Diff

View File

View File

@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
# os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
# os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

@@ -1,55 +0,0 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Aug 31 10:27:18 2022
Database migration from a sensible format to the BEC YAML file format
@author: mohacsi_i
"""
import yaml
import yaml.representer
def MigrateYamlFile(filein, fileout):
""" Migrates an absolutely minimal YAML config file to the format
required by the BEC (i.e. adding default fields).
"""
fp = open(filein, "r")
lut = yaml.load(fp, Loader=yaml.Loader)
# Allocate empty database
db = dict()
for k,v in lut.items():
new = v
# Adding defaults
if 'onFailure' not in new:
new['onFailure'] = "buffer"
if 'enabled' not in new:
new['enabled'] = True
if 'readoutPriority' not in new:
new['readoutPriority'] = "monitored"
if 'readOnly' not in new:
new['readOnly'] = bool(new['deviceClass'] in ('ophyd.EpicsSignalRO'))
if 'softwareTrigger' not in new:
new['softwareTrigger'] = False
if new['deviceClass'] == "ophyd.EpicsSignalRO":
if "read_pv" not in new['deviceConfig']:
new["deviceConfig"]["read_pv"] = new["deviceConfig"]["prefix"]
del new["deviceConfig"]["prefix"]
db[k] = new
with open(fileout, 'w') as stream:
yaml.dump(db, stream, default_flow_style=None, sort_keys=False)
# Automatically start simulation if directly invoked
if __name__ == "__main__":
MigrateYamlFile("./x06da_compact.lmay", "x06da_device_config.yaml")

View File

@@ -1,199 +0,0 @@
# OP before mono
slh_trxr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-SLH:TRXR'
slh_trxw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-SLH:TRXW'
fi1_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-FI1:TRY'
# DCCM crystal 1
dccm_pitch1:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:PITCH1'
dccm_energy1:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:ENERGY1'
dccm_diode:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-OP-XPM1:BOT:READOUT'
# DCCM crystal 2
dccm_pitch2:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:PITCH2'
dccm_energy2:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:ENERGY2'
dccm_xbpm:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV'
# DCCM common
dccm_energy:
description: Monochromator energy using ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:_ENERGY'
dccm_eoffset:
description: Monochromator energy offset for ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-OP-DCCM:_EOFFSET'
# Secondary source XBPM
ssxbpm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSBPM1:TRX'
ssxbpm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSBPM1:TRY'
ssslit_trxr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSH1:TRXR'
ssslit_trxw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSH1:TRXW'
ssslit_tryt:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSV1:TRYT'
ssslit_tryb:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSSV1:TRYB'
ssxi1_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSXI1:TRX'
ssxi1_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES-SSXI1:TRY'
# Vertical focusing mirror
vfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRXU'
enabled: false
vfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRXD'
enabled: false
vfm_tryuw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRYUW'
vfm_tryr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRYR'
vfm_trydw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRYDW'
vfm_pitch:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:PITCH'
vfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:YAW'
enabled: false
vfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:ROLL'
enabled: false
vfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRX'
enabled: false
vfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-VFM:TRY'
# Horizontal focusing mirror
hfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRXU'
enabled: false
hfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRXD'
enabled: false
hfm_tryur:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRYUR'
hfm_tryw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRYW'
hfm_trydr:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRYDR'
hfm_pitch:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:PITCH'
enabled: false
hfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:YAW'
enabled: false
hfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:ROLL'
enabled: false
hfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRX'
enabled: false
hfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: 'X06DA-ES1-HFM:TRY'
# Exposure box signals
xbox_diode:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-ES-DI1:READOUT'
bstop_diode:
deviceClass: ophyd.EpicsSignalRO
deviceConfig:
prefix: 'X06DA-ES-BS:READOUT'

View File

@@ -1,5 +1,129 @@
sls_current:
description: sls current
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'ARS07-DPCT-0100:CURR', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
vg0_press:
description: VG0 pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-VMCC-0000:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
abs_press:
description: Absorber pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-ABS1-VMCC-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
sldi_cenx:
description: FE slit-diaphragm horizontal center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENTERX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizex:
description: FE slit-diaphragm horizontal size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_ceny:
description: FE slit-diaphragm vertical center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENTERY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizey:
description: FE slit-diaphragm vertical size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_try:
description: FE collimating mirror try
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_pitch:
description: FE collimating mirror pitch
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:PITCH'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_bend:
description: FE collimating mirror bend
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:BEND1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
slh_press:
description: OP slit pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-SLH-VMFR-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
slh_trxr:
description: OP slit inner blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXR'}
onFailure: buffer
@@ -8,6 +132,7 @@ slh_trxr:
readOnly: false
softwareTrigger: false
slh_trxw:
description: OP slit outer blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-SLH:TRXW'}
onFailure: buffer
@@ -16,96 +141,106 @@ slh_trxw:
readOnly: false
softwareTrigger: false
fi1_try:
description: Beam attenuator motion before mono
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-FI1:TRY'}
deviceConfig: {prefix: 'X06DA-OP-FI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_pitch1:
dccm_theta1:
description: Monochromator pitch 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH1'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_energy1:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_diode:
dccm_diode_top:
description: Top diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:BOT:READOUT'}
deviceConfig: {read_pv: 'X06DA-OP-XPM1:TOP:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_pitch2:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH2'}
dccm_diode_bottom:
description: Bottom diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:BOT:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
readOnly: true
softwareTrigger: false
dccm_energy2:
dccm_theta2:
description: Monochromator pitch 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY2'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_energy:
description: Monochromator energy using ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:_ENERGY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_eoffset:
description: Monochromator energy offset between crystals using ECMC virtual motors
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:_EOFFSET'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA2'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_xbpm:
description: XBPM total intensity after monochromator
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV'}
deviceConfig: {read_pv: 'X06DA-OP-XBPM1:SumAll:MeanValue_RBV', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
ssxbpm_trx:
dccm_energy:
description: Monochromator energy
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:ENERGY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_eoffset:
description: Monochromator energy offset
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:EOFFSET'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm_trx:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm_try:
description: XBPM motion before secondary source
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY'}
deviceConfig: {prefix: 'X06DA-ES-SSBPM1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxbpm:
description: XBPM before secondary source
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-SSBPM1:SumAll:MeanValue_RBV'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
ssslit_trxr:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXR'}
onFailure: buffer
@@ -114,6 +249,7 @@ ssslit_trxr:
readOnly: false
softwareTrigger: false
ssslit_trxw:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSH1:TRXW'}
onFailure: buffer
@@ -122,6 +258,7 @@ ssslit_trxw:
readOnly: false
softwareTrigger: false
ssslit_tryt:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYT'}
onFailure: buffer
@@ -130,6 +267,7 @@ ssslit_tryt:
readOnly: false
softwareTrigger: false
ssslit_tryb:
description: Secondary source blade motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSSV1:TRYB'}
onFailure: buffer
@@ -138,16 +276,18 @@ ssslit_tryb:
readOnly: false
softwareTrigger: false
ssxi1_trx:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX'}
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
ssxi1_try:
description: Secondary source diagnostic screen motion
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY'}
deviceConfig: {prefix: 'X06DA-ES-SSXI1:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -155,7 +295,7 @@ ssxi1_try:
softwareTrigger: false
vfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRXU'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -163,39 +303,40 @@ vfm_trxu:
softwareTrigger: false
vfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRXD'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_tryuw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRYUW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_tryr:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRYR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
vfm_trydw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRYDW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# vfm_tryuw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYUW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_tryr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# vfm_trydw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-VFM:TRYDW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
vfm_pitch:
description: KB mirror vertical steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:PITCH'}
deviceConfig: {prefix: 'X06DA-ES-VFM:PITCH'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -203,7 +344,7 @@ vfm_pitch:
softwareTrigger: false
vfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:YAW'}
deviceConfig: {prefix: 'X06DA-ES-VFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -211,7 +352,7 @@ vfm_yaw:
softwareTrigger: false
vfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:ROLL'}
deviceConfig: {prefix: 'X06DA-ES-VFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -219,7 +360,7 @@ vfm_roll:
softwareTrigger: false
vfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRX'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -227,7 +368,7 @@ vfm_trx:
softwareTrigger: false
vfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-VFM:TRY'}
deviceConfig: {prefix: 'X06DA-ES-VFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -235,7 +376,7 @@ vfm_try:
softwareTrigger: false
hfm_trxu:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRXU'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXU'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -243,39 +384,40 @@ hfm_trxu:
softwareTrigger: false
hfm_trxd:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRXD'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRXD'}
enabled: false
onFailure: buffer
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_tryur:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRYUR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_tryw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRYW'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
hfm_trydr:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRYDR'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# hfm_tryur:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYUR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_tryw:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYW'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
# hfm_trydr:
# deviceClass: ophyd.EpicsMotor
# deviceConfig: {prefix: 'X06DA-ES-HFM:TRYDR'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: false
# softwareTrigger: false
hfm_pitch:
description: KB mirror horizontal steering
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:PITCH'}
deviceConfig: {prefix: 'X06DA-ES-HFM:PITCH'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -283,7 +425,7 @@ hfm_pitch:
softwareTrigger: false
hfm_yaw:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:YAW'}
deviceConfig: {prefix: 'X06DA-ES-HFM:YAW'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -291,7 +433,7 @@ hfm_yaw:
softwareTrigger: false
hfm_roll:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:ROLL'}
deviceConfig: {prefix: 'X06DA-ES-HFM:ROLL'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -299,7 +441,7 @@ hfm_roll:
softwareTrigger: false
hfm_trx:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRX'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRX'}
enabled: false
onFailure: buffer
readoutPriority: monitored
@@ -307,13 +449,59 @@ hfm_trx:
softwareTrigger: false
hfm_try:
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES1-HFM:TRY'}
deviceConfig: {prefix: 'X06DA-ES-HFM:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
# xbox_xbpm:
# description: Exposure box XBPM
# deviceClass: ophyd.EpicsSignalRO
# deviceConfig: {read_pv: 'X06DA-ES-XBBPM1:SumAll:MeanValue_RBV'}
# onFailure: buffer
# enabled: true
# readoutPriority: monitored
# readOnly: true
# softwareTrigger: false
xbox_fil1:
description: Exposure box filter wheel 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI1:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil2:
description: Exposure box filter wheel 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI2:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil3:
description: Exposure box filter wheel 3
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI3:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_fil4:
description: Exposure box filter wheel 4
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-FI4:ROZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
xbox_diode:
description: Exposure box diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DI1:READOUT'}
onFailure: buffer
@@ -321,11 +509,269 @@ xbox_diode:
readoutPriority: monitored
readOnly: true
softwareTrigger: false
bstop_diode:
gonpos:
description: Sample sensor distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-BS:READOUT'}
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-USER1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
gonvalid:
description: Sample in valid distance
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-DF1:CBOX-CMP1', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
samzoom:
description: Sample microscope zoom
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-SAMCAM:ZOOM'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samcam:
description: Sample camera aggregate device
deviceClass: pxiii_bec.devices.SamCamDetector
deviceConfig: {prefix: 'X06DA-SAMCAM:'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
samstream:
description: Sample camera ZMQ stream
deviceClass: pxiii_bec.devices.StdDaqPreviewDetector
deviceConfig:
url: 'tcp://129.129.110.12:9089'
deviceTags:
- detector
enabled: true
readoutPriority: async
readOnly: false
softwareTrigger: false
# samimg:
# description: Sample camera image from EPICS
# deviceClass: pxiii_bec.devices.NDArrayPreview
# deviceConfig:
# prefix: 'X06DA-SAMCAM:image1:'
# deviceTags:
# - detector
# enabled: true
# readoutPriority: async
# readOnly: false
# softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic in-out
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_x:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_y:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_z:
description: Beamstop translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-BS:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
bstop_pneum:
description: Beamstop pneumatic
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
bstop_diode:
description: Beamstop diode
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-ES-BS:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
frontlight:
description: Microscope frontlight
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', kind: 'config', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
backlight:
description: Backlight reflector
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
gmx:
description: ABR horizontal stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMX', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmy:
description: ABR vertical stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMY', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
gmz:
description: ABR axial stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:GMZ', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
omega:
description: ABR rotation stage
deviceClass: pxiii_bec.devices.A3200Axis
deviceConfig: {prefix: 'X06DA-ES-DF1:OMEGA', base_pv: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
abr:
description: Aerotech ABR motion system
deviceClass: pxiii_bec.devices.AerotechAbrStage
deviceConfig: {prefix: 'X06DA-ES'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
coll_x:
description: Collimator X
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-COL:TRX1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
coll_y:
description: Collimator Y
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-COL:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shx:
description: SmarGon X axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shy:
description: SmarGon Y axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
shz:
description: SmarGon Z axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 10, high_limit: 22, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
chi:
description: SmarGon CHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 0, high_limit: 40, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
phi:
description: SmarGon PHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_y:
description: Pilatus height
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_z:
description: Pilatus translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false

426
pxiii_bec/devices/A3200.py Normal file
View File

@@ -0,0 +1,426 @@
"""
``Aerotech`` --- Aerotech control software
******************************************
This module provides an object to control the Aerotech Abr rotational stage.
Methods in the Abr class
========================
Standard bluesky interface:
AerotechAbrStage.configure(d={...})
AerotechAbrStage.kickoff()
AerotechAbrStage.stop()
Additional bluesky functionality:
Aerotech.is_homed()
Aerotech.do_homing(wait=True)
Aerotech.get_ready(ostart=None, orange=None, etime=None, wait=True)
Aerotech.is_done()
Aerotech.is_ready()
Aerotech.is_busy()
Aerotech.start_exposure()
Aerotech.wait_status(status)
Aerotech.move(angle, wait=False, speed=None)
Aerotech.set_shutter(state)
Returns the axis mode: ``-ES-DF1:AXES-MODE``
Examples
========
abr = AerotechAbrStage(prefix="X06DA-ES-DF1", name="abr")
# move omega to 270.0 degrees
abr.omega = 270.0
# move omega to 180 degrees and wait for movement to finish
abr.move(180, wait=True)
# move omega to 3000 degrees at 360 degrees/s and wait for movement to finish
abr.move(3000, velocity=360, wait=True)
# stop any movement
abr.stop() # this function only returns after the STATUS is back to OK
"""
import time
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
try:
from .A3200enums import AbrCmd, AbrMode
except ImportError:
from A3200enums import AbrCmd, AbrMode
from bec_lib import bec_logger
logger = bec_logger.logger
# pylint: disable=logging-fstring-interpolation
class AerotechAbrStage(PSIDeviceBase, Device):
"""Standard PX stage on A3200 controller
This is the wrapper class for the standard rotation stage layout for the PX
beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and
the associated motion axes GMX, GMY and GMZ. The ophyd class associates to
the general PX measurement procedure, which is that the actual scan script
is running as an AeroBasic program on the controller and we communicate to
it via 10+1 global variables.
"""
USER_ACCESS = ["reset", "kickoff", "complete", "set_axis_mode", "arm", "disarm"]
taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
clear = Component(EpicsSignal, "-AERO:CTRL-CLFT", put_complete=True, kind=Kind.omitted)
# Enable/disable motor movement via the IOC (i.e. make it task-only)
axisModeLocked = Component(EpicsSignal, "-DF1:LOCK", put_complete=True, kind=Kind.omitted)
axisModeDirect = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
axisAxesMode = Component(EpicsSignal, "-DF1:AXES-MODE", put_complete=True, kind=Kind.omitted)
# Shutter box is missing readback so the -GET signal is installed on the VME
# _shutter = Component(
# EpicsSignal, "-PH1:GET", write_pv="-PH1:SET", put_complete=True, kind=Kind.config,
# )
# Status flags for all axes
omega_done = Component(EpicsSignalRO, "-DF1:OMEGA-DONE", auto_monitor=True, kind=Kind.normal)
gmx_done = Component(EpicsSignalRO, "-DF1:GMX-DONE", auto_monitor=True, kind=Kind.normal)
gmy_done = Component(EpicsSignalRO, "-DF1:GMY-DONE", auto_monitor=True, kind=Kind.normal)
gmz_done = Component(EpicsSignalRO, "-DF1:GMZ-DONE", auto_monitor=True, kind=Kind.normal)
# For some reason the task interface is called PSO...
scan_command = Component(EpicsSignal, "-PSO:CMD", put_complete=True, kind=Kind.omitted)
start_command = Component(
EpicsSignal, "-PSO:START-TEST.PROC", put_complete=True, kind=Kind.omitted
)
stop_command = Component(
EpicsSignal, "-PSO:STOP-TEST.PROC", put_complete=True, kind=Kind.omitted
)
# Global variables to controll AeroBasic scripts
_var_1 = Component(EpicsSignal, "-PSO:VAR-1", put_complete=True, kind=Kind.omitted)
_var_2 = Component(EpicsSignal, "-PSO:VAR-2", put_complete=True, kind=Kind.omitted)
_var_3 = Component(EpicsSignal, "-PSO:VAR-3", put_complete=True, kind=Kind.omitted)
_var_4 = Component(EpicsSignal, "-PSO:VAR-4", put_complete=True, kind=Kind.omitted)
_var_5 = Component(EpicsSignal, "-PSO:VAR-5", put_complete=True, kind=Kind.omitted)
_var_6 = Component(EpicsSignal, "-PSO:VAR-6", put_complete=True, kind=Kind.omitted)
_var_7 = Component(EpicsSignal, "-PSO:VAR-7", put_complete=True, kind=Kind.omitted)
_var_8 = Component(EpicsSignal, "-PSO:VAR-8", put_complete=True, kind=Kind.omitted)
_var_9 = Component(EpicsSignal, "-PSO:VAR-9", put_complete=True, kind=Kind.omitted)
_var_10 = Component(EpicsSignal, "-PSO:VAR-10", put_complete=True, kind=Kind.omitted)
# Task status PVs (programs always run on task 1)
task1 = Component(EpicsSignalRO, "-AERO:TSK1-DONE", auto_monitor=True)
task2 = Component(EpicsSignalRO, "-AERO:TSK2-DONE", auto_monitor=True)
task3 = Component(EpicsSignalRO, "-AERO:TSK3-DONE", auto_monitor=True)
task4 = Component(EpicsSignalRO, "-AERO:TSK4-DONE", auto_monitor=True)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", auto_monitor=True, kind=Kind.config)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# super() will call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def set_axis_mode(self, mode: str, settle_time=0.1) -> None:
"""Set axis mode to direct/measurement mode.
Measurement ensures that the scrips run undisturbed by blocking axis
motion commands from the IOC (i.e. internal only).
Parameters:
-----------
mode : str
Valid values are 'direct' and 'measuring'.
"""
if mode == "direct":
self.axisModeDirect.set(37, settle_time=settle_time).wait()
if mode == "measuring":
self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait()
def on_stage(self):
"""
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
"""
d = {}
# FIXME: I don't care about how we fish out config parameters from scan info
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
scanname = self.scan_info.msg.scan_name
if scanname in (
"standardscan",
"helicalscan",
"helicalscan1",
"helicalscan2",
"helicalscan3",
):
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["range"]
d["var_3"] = scan_args["move_time"]
d["var_4"] = scan_args.get("ready_rate", 500)
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("verticallinescan", "vlinescan"):
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
d["var_1"] = scan_args["range"] / scan_args["steps"]
d["var_2"] = scan_args["steps"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = 0
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("screeningscan"):
d["scan_command"] = AbrCmd.SCREENING
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["oscrange"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = scan_args["range"] / scan_args["steps"]
d["var_5"] = scan_args["steps"]
d["var_6"] = scan_args.get("delta", 0.5)
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("rasterscan", "rastersimplescan"):
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
d["var_1"] = scan_args["exp_time"]
d["var_2"] = scan_args["range_x"] / scan_args["steps_x"]
d["var_3"] = scan_args["range_y"] / scan_args["steps_y"]
d["var_4"] = scan_args["steps_x"]
d["var_5"] = scan_args["steps_y"]
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
# Reconfigure if got a valid scan config
if len(d) > 0:
self.configure(d)
# Stage the ABR stage
self.arm()
def on_unstage(self):
"""Unstage the ABR controller"""
self.disarm()
def configure(self, d: dict) -> tuple:
""" " Configure the exposure scripts
Script execution at the PX beamlines is based on scripts that are always
running on the controller that execute commands when commanded by
setting a pre-defined set of global variables. This method performs the
configuration of the exposure scrips by setting the required global
variables.
Parameters in d: dict
---------------------
scan_command: int
The index of the desired AeroBasic program to be executed.
Usually supported values are taken from an Enum.
var_1:
var_2:
var_3:
var_4:
var_5:
var_6:
var_7:
var_8:
var_9:
var_10:
"""
old = self.read_configuration()
# ToDo: Check if idle before reconfiguring
self.scan_command.set(d["scan_command"]).wait()
# Set the corresponding global variables
if "var_1" in d and d["var_1"] is not None:
self._var_1.set(d["var_1"]).wait()
if "var_2" in d and d["var_2"] is not None:
self._var_2.set(d["var_2"]).wait()
if "var_3" in d and d["var_3"] is not None:
self._var_3.set(d["var_3"]).wait()
if "var_4" in d and d["var_4"] is not None:
self._var_4.set(d["var_4"]).wait()
if "var_5" in d and d["var_5"] is not None:
self._var_5.set(d["var_5"]).wait()
if "var_6" in d and d["var_6"] is not None:
self._var_6.set(d["var_6"]).wait()
if "var_7" in d and d["var_7"] is not None:
self._var_7.set(d["var_7"]).wait()
if "var_8" in d and d["var_8"] is not None:
self._var_8.set(d["var_8"]).wait()
if "var_9" in d and d["var_9"] is not None:
self._var_9.set(d["var_9"]).wait()
if "var_10" in d and d["var_10"] is not None:
self._var_10.set(d["var_10"]).wait()
new = self.read_configuration()
return old, new
def arm(self):
"""Bluesky-style stage
Since configuration synchronization is not guaranteed, this does
nothing. The script launched by kickoff().
"""
def on_kickoff(self, timeout=1) -> SubscriptionStatus:
"""Kick off the set program"""
self.start_command.set(1).wait()
# Define wait until the busy flag goes high
def is_busy(*, value, **_):
return bool(value == 0)
# Subscribe and wait for update
status = SubscriptionStatus(self.scan_done, is_busy, timeout=timeout, settle_time=0.1)
status.wait()
# return status
def disarm(self, settle_time=0.1):
"""Stops current script and releases the axes"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
def complete(self, timeout=None) -> SubscriptionStatus:
"""Waits for task execution
NOTE: Original complete was raster scanner complete...
"""
# Define wait until the busy flag goes down (excluding initial update)
def is_idle(*, value, **_):
return bool(value == 1)
# Subscribe and wait for update
# status = SubscriptionStatus(self.task1, is_idle, timeout=timeout, settle_time=0.5)
status = SubscriptionStatus(self.scan_done, is_idle, timeout=timeout, settle_time=0.5)
return status
def reset(self, settle_time=0.1, wait_after_reload=1) -> None:
"""Resets the Aerotech controller state
Attempts to reset the currently running measurement task on the A3200
by stopping current motions, reloading aerobasic programs and going
to DIRECT mode.
This will stop movements in both DIRECT and MEASURING modes. During the
stop the `status` temporarely goes to ERROR but reverts to OK after a
couple of seconds.
"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1, settle_time=settle_time)
# Reload tasks
self.taskStop.set(1, settle_time=wait_after_reload).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
# pylint: disable=arguments-differ
def stop(self, settle_time=1.0) -> None:
"""Stops current motions"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()
self.stop_command.set(1).wait()
# Go to direct mode
self.set_axis_mode("direct", settle_time=settle_time)
def is_ioc_ok(self):
"""Checks execution status"""
return 0 == self.status.get()
@property
def axis_mode(self):
"""Read axis mode"""
return self.axisAxesMode.get()
# @property
# def shutter(self):
# return self._shutter.get()
# @shutter.setter
# def shutter(self, value):
# if self.axisAxesMode.get():
# print("ABR is not in direct mode; cannot manipulate shutter")
# return False
# state = str(state).lower()
# if state not in ["1", "0", "closed", "open"]:
# print("unknown shutter state requested")
# return None
# elif state in ["1", "open"]:
# state = 1
# elif state == ["0", "closed"]:
# state = 0
# self._shutter.set(state).wait()
# return state == self._shutter.get()
def wait_for_movements(self, timeout=60.0):
"""Waits for all motor movements"""
t_start = time.time()
t_elapsed = 0
while self.is_moving() and t_elapsed < timeout:
t_elapsed = time.time() - t_start
if timeout is not None and t_elapsed > timeout:
raise TimeoutError("Timeout waiting for all axis to stop moving")
time.sleep(0.5)
def is_moving(self):
"""Chechs if all axes are DONE"""
return not (
self.omega_done.get()
and self.gmx_done.get()
and self.gmy_done.get()
and self.gmz_done.get()
)
if __name__ == "__main__":
abr = AerotechAbrStage(prefix="X06DA-ES", name="abr")
abr.wait_for_connection()

View File

@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
"""
Enumerations for the MX specific Aerotech A3200 stage.
@author: mohacsi_i
"""
# pylint: disable=too-few-public-methods
class AbrStatus:
"""ABR measurement task status"""
DONE = 0
READY = 1
BUSY = 2
class AbrGridStatus:
"""ABR grid scan status"""
BUSY = 0
DONE = 1
class AbrMode:
"""ABR mode status"""
DIRECT = 0
MEASURING = 1
class AbrShutterStatus:
"""ABR shutter status"""
CLOSE = 0
OPEN = 1
class AbrCmd:
"""ABR command table"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
VERTICAL_LINE_SCAN = 3
SCREENING = 4
SUPER_FAST_OMEGA = 5
STILL_WEDGE = 6
STILLS = 7
REPEAT_SINGLE_OSCILLATION = 8
SINGLE_OSCILLATION = 9
OLD_FASHIONED = 10
RASTER_SCAN = 11
JET_ROTATION = 12
X_HELICAL = 13
X_RUNSEQ = 14
JUNGFRAU = 15
MSOX = 16
SLIT_SCAN = 17
RASTER_SCAN_STILL = 18
SCAN_SASTT = 19
SCAN_SASTT_V2 = 20
SCAN_SASTT_V3 = 21
class AbrAxis:
"""ABR axis index"""
OMEGA = 1
GMX = 2
GMY = 3
GMZ = 4
STY = 5
STZ = 6

View File

@@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
import types
from collections import OrderedDict
from ophyd import Component, PVPositioner, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
from ophyd.status import Status, MoveStatus
from bec_lib import bec_logger
from .A3200enums import AbrMode
logger = bec_logger.logger
# ABR_DONE = 0
# ABR_READY = 1
# ABR_BUSY = 2
# GRID_SCAN_BUSY = 0
# GRID_SCAN_DONE = 1
# DIRECT_MODE = 0
# MEASURING_MODE = 1
class A3200Axis(PVPositioner):
"""Positioner wrapper for A3200 axes
Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC
does not provide a motor record, this class simply wraps axes into a
standard Ophyd positioner for the BEC. It also has some additional
functionality for error checking and diagnostics.
Examples
--------
omega = A3200Axis('X06DA-ES-DF1:OMEGA', base_pv='X06DA-ES')
Parameters
----------
prefix : str
Axis PV name root.
base_pv : str (situational)
IOC PV name root, i.e. X06DA-ES if standalone class.
"""
USER_ACCESS = ["omove"]
abr_mode_direct = Component(
EpicsSignal, "-DF1:MODE-DIRECT", put_complete=True, kind=Kind.omitted
)
abr_mode = Component(
EpicsSignal, "-DF1:AXES-MODE", auto_monitor=True, put_complete=True, kind=Kind.omitted
)
# Basic PV positioner interface
done = Component(EpicsSignalRO, "-DONE", auto_monitor=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "-RBV", auto_monitor=True, kind=Kind.hinted)
# Setpoint is one of the two...
setpoint = Component(EpicsSignal, "-SETP", kind=Kind.config)
# setpoint = Component(EpicsSignal, "-VAL", kind=Kind.config)
velocity = Component(EpicsSignal, "-SETV", kind=Kind.config)
status = Component(EpicsSignalRO, "-STAT", auto_monitor=True, kind=Kind.config)
# PV to issue native relative movements on the A3200
relmove = Component(EpicsSignal, "-INCP", put_complete=True, kind=Kind.config)
# PV to home axis
home = Component(EpicsSignal, "-HOME", kind=Kind.config)
ishomed = Component(EpicsSignal, "-AS00", kind=Kind.config)
# HW status words
dshw = Component(EpicsSignalRO, "-DSHW", auto_monitor=True, kind=Kind.normal)
ashw = Component(EpicsSignalRO, "-ASHW", auto_monitor=True, kind=Kind.normal)
fslw = Component(EpicsSignalRO, "-FSLW", auto_monitor=True, kind=Kind.normal)
# Rock movement
_rock = Component(EpicsSignal, "-ROCK", put_complete=True, kind=Kind.config)
_rock_dist = Component(EpicsSignal, "-RINCP", put_complete=True, kind=Kind.config)
_rock_velo = Component(EpicsSignal, "-RSETV", put_complete=True, kind=Kind.config)
_rock_count = Component(EpicsSignal, "-COUNT", put_complete=True, kind=Kind.config)
# _rock_accel = Component(EpicsSignal, "-RRATE", put_complete=True, kind=Kind.config)
hlm = Component(Signal, kind=Kind.config)
llm = Component(Signal, kind=Kind.config)
vmin = Component(Signal, kind=Kind.config)
vmax = Component(Signal, kind=Kind.config)
offset = Component(EpicsSignal, "-OFF", put_complete=True, kind=Kind.config)
# pylint: disable=too-many-arguments
def __init__(
self,
prefix="",
*,
name,
base_pv="",
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
llm=0,
hlm=0,
vmin=0,
vmax=0,
**kwargs,
):
"""__init__ MUST have a full argument list"""
# Patching the parent's PVs into the axis class to check for direct/locked mode
if parent is None:
def maybe_add_prefix(self, _, kw, suffix):
# Patched not to enforce parent prefix when no parent
if kw in self.add_prefix:
return suffix
return suffix
self.__class__.__dict__["abr_mode"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, self.__class__.__dict__["abr_mode"]
)
self.__class__.__dict__["abr_mode_direct"].maybe_add_prefix = types.MethodType(
maybe_add_prefix, self.__class__.__dict__["abr_mode_direct"]
)
logger.info(self.__class__.__dict__["abr_mode"].kwargs)
self.__class__.__dict__["abr_mode"].suffix = base_pv + "-DF1:AXES-MODE"
self.__class__.__dict__["abr_mode_direct"].suffix = base_pv + "-DF1:MODE-DIRECT"
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.llm.set(llm).wait()
self.hlm.set(hlm).wait()
self.vmin.set(vmin).wait()
self.vmax.set(vmax).wait()
def omove(
self,
position,
wait=True,
timeout=None,
# moved_cb=None,
velocity=None,
relative=False,
direct=False,
**kwargs,
) -> MoveStatus:
"""Native absolute/relative movement on the A3200"""
# Check if we're in direct movement mode
if self.abr_mode.value not in (AbrMode.DIRECT, "DIRECT"):
if direct:
self.abr_mode_direct.set(1).wait()
else:
raise RuntimeError(f"ABR axis not in direct mode: {self.abr_mode.value}")
# Before moving, ensure we can stop (if a stop_signal is configured).
if self.stop_signal is not None:
self.stop_signal.wait_for_connection()
# Set velocity if provided
if velocity is not None:
self.velocity.set(velocity).wait()
# This is adapted from pv_positioner.py
self.check_value(position)
# Get MoveStatus from parent of parent
status = PositionerBase.move(self, position=position, timeout=timeout, **kwargs)
has_done = self.done is not None
if not has_done:
moving_val = 1 - self.done_value
self._move_changed(value=self.done_value)
self._move_changed(value=moving_val)
try:
if relative:
# Relative movement instead of setpoint
self.relmove.put(position, wait=True)
else:
# Standard absolute movement
self.setpoint.put(position, wait=True)
if wait:
status.wait()
except KeyboardInterrupt:
self.stop()
raise
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def move(self, position, wait=True, timeout=None, moved_cb=None, **kwargs) -> MoveStatus:
"""Exposes the ophyd move command through BEC abstraction"""
return self.omove(position, wait=wait, timeout=timeout, moved_cb=moved_cb, **kwargs)
def rock(self, distance, counts: int, velocity=None, wait=True) -> Status:
"""Repeated single axis zigzag scan I guess PSO should be configured for this"""
self._rock_dist.put(distance)
self._rock_count.put(counts)
if velocity is not None:
self._rock_velo.put(velocity)
# if acceleration is not None:
# self._rock_accel.put(acceleration)
self._rock.put(1)
status = super().move(position=distance)
if wait:
status.wait()
return status
# def is_omega_ok(self):
# """Checks omega axis status"""
# return 0 == self.self.omega.status.get()
# def is_homed(self):
# """Checks if omega is homed"""
# return 1 == self.omega.is_homed.get()
# def do_homing(self, wait=True):
# """Execute the homing procedure.
# Executes the homing procedure on omega and waits (default) until it is completed.
# TODO: Return a status object to do this wwith futures and monitoring.
# PARAMETERS
# `wait` true / false if the routine is to wait for the homing to finish.
# """
# self.omega.home.set(1, settle_time=1).wait()
# if not wait:
# return
# while not self.omega.is_homed():
# time.sleep(0.2)
# Automatically start an axis if directly invoked
if __name__ == "__main__":
omega = A3200Axis(prefix="X06DA-ES-DF1:OMEGA", base_pv="X06DA-ES", name="omega")
omega.wait_for_connection()

View File

@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
"""
``NDArrayPreview`` --- Standalone Preview for ImagePlugin
*********************************************************
This module provides a standalone object to receive images to ophyd from the
AreaDetector's ImagePlugin.
Created on Wed Jan 29 2025
@author: mohacsi_i
"""
import numpy as np
from ophyd import Device, Component, EpicsSignal, EpicsSignalWithRBV, Kind, Staged
from ophyd.areadetector import NDDerivedSignal
from bec_lib import bec_logger
logger = bec_logger.logger
class SilentNDDerivedSignal(NDDerivedSignal):
"""Silent version of NDDerivedSignal, it does not spam the terminal on
every defective frame (shit happens, ok?)."""
def _array_shape_callback(self, **kwargs):
try:
super()._array_shape_callback(**kwargs)
except RuntimeError:
pass
class NDArrayPreview(Device):
"""Wrapper class around AreaDetector's NDStdArray plugins
This is a standalone class to display images from AreaDetector's
ImagePlugin without using a parent device. It also offers BEC exposed
methods to transfer image and change image array Kind-ness.
NOTE: As an explicit request, it can toggle data recording
"""
# Subscriptions for plotting image
USER_ACCESS = ["image", "savemode"]
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
# Status attributes
min_callback_time = Component(
EpicsSignalWithRBV, "MinCallbackTime", kind=Kind.config, put_complete=True
)
array_size_x = Component(EpicsSignal, "ArraySize0_RBV", kind=Kind.config)
array_size_y = Component(EpicsSignal, "ArraySize1_RBV", kind=Kind.config)
array_size_z = Component(EpicsSignal, "ArraySize2_RBV", kind=Kind.config)
ndimensions = Component(EpicsSignal, "NDimensions_RBV", kind=Kind.config)
array_data = Component(EpicsSignal, "ArrayData", kind=Kind.omitted)
shaped_image = Component(
SilentNDDerivedSignal,
derived_from="array_data",
shape=("array_size_z", "array_size_y", "array_size_x"),
num_dimensions="ndimensions",
kind=Kind.omitted,
)
def read(self):
"""Stream out data on every read()"""
if self._staged == Staged.yes:
image = self.shaped_image.get()
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
return super().read()
def savemode(self, save=False):
"""Toggle save mode for the shaped image"""
# pylint: disable=protected-access
if save:
self.shaped_image._kind = Kind.normal
else:
self.shaped_image._kind = Kind.omitted
def image(self):
"""Fallback method in case image streaming fills up the BEC"""
array_size = (self.array_size_z.get(), self.array_size_y.get(), self.array_size_x.get())
if array_size == (0, 0, 0):
raise RuntimeError("Invalid image; ensure array_callbacks are on")
if array_size[-1] == 0:
array_size = array_size[:-1]
image = self.array_data.get()
return np.array(image).reshape(array_size)
# Automatically connect to SAMCAM at PXIII if directly invoked
if __name__ == "__main__":
img = NDArrayPreview("X06DA-SAMCAM:image1:", name="samimg")
img.wait_for_connection()

View File

@@ -0,0 +1,47 @@
from ophyd import EpicsSignal
from ophyd.status import SubscriptionStatus
class PneumaticValve(EpicsSignal):
"""Wrapper around EpicsSignal to wait until reaching target. Use the
status returned by set() to wait until movement is finished. Do NOT
use put if you want to wait, that's a low-level PV write op.
NOTE: The SET and GET states do not match exactly
"""
def set(self, value, *, timeout=5, settle_time=0.1):
"""Overloaded setter that waits for target state
NOTE: The SubscriptionStatus callback does not run in put()
"""
# Lazy hardcoded state lookup
target = 1 if value in (1, "Measure") else 2
# Define wait until an end state is reached
def on_target(*, value, **_):
return bool(value == target)
# Subscribe a monitor in advance and wait for update
status = SubscriptionStatus(self, on_target, timeout=timeout, settle_time=0.1)
# Set value to start movement
super().set(value, settle_time=settle_time).wait()
# Return the monitor
return status
def check_value(self, value):
"""Input validation"""
if value not in (0, 1, "Measure", "Park"):
raise ValueError(f"Unsupported pneumatic valve target {value}")
return super().check_value(value)
if __name__ == "__main__":
pneum = PneumaticValve(
read_pv="X06DA-ES-BS:GET-POS",
write_pv="X06DA-ES-BS:SET-POS",
auto_monitor=True,
put_complete=True,
name="bspump",
)
pneum.wait_for_connection()

View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
"""
``SamCam`` --- Sample Camera control software
*********************************************
This module provides an object to control the sample camera at the PX III
beamline. The camera should run continously and stream data via ZMQ for
the GUI and alignment scripts.
Created on Thu Jan 30 2025
@author: mohacsi_i
"""
from ophyd import ADComponent
from ophyd_devices.devices.areadetector.cam import GenICam
# from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
CustomDetectorMixin,
)
from bec_lib import bec_logger
logger = bec_logger.logger
class SamCamSetup(CustomDetectorMixin):
"""Simple camera mixin class, the SAMCAM is usually streaming"""
def on_stage(self):
"""Just make sure it's running continously"""
self.parent.cam.acquire.put(1, wait=True)
def on_unstage(self):
"""Should run continously"""
def on_stop(self):
"""Should run continously"""
class SamCamDetector(PSIDetectorBase):
"""Sample camera device
The SAMCAM continously streams images to the GUI and sample alignment
scripts via ZMQ.
"""
custom_prepare_cls = SamCamSetup
cam = ADComponent(GenICam, "cam1:")
# image = ADComponent(ImagePlugin_V35, "image1:")

View File

@@ -0,0 +1,284 @@
"""
``SmarGon`` --- SmarGon control software
******************************************
The module provides an object to control the SmarGon goniometer axes at PX III.
The SmarGon axes are interfaced as positioners.
"""
import time
from threading import Thread, Lock
import requests
from requests.adapters import HTTPAdapter, Retry
from collections import OrderedDict
from ophyd import Component, Kind, Signal, PVPositioner
from ophyd.status import SubscriptionStatus
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("SmarGon")
# SmarGon contoller can't really handle multiple connections
# Use this mutex to ensure one access at a time
mutex = Lock()
class SmarGonSignal(Signal):
"""SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
"""
def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
super().__init__(*args, **kwargs)
self.write_addr = write_addr
self.addr = self.parent.name
self._limits = (low_limit, high_limit)
# self.get()
def put(self, value, *, timestamp=None, **kwargs):
"""Overriden put to add communication with smargopolo"""
# Validate new value and get timestamp
self.check_value(value)
if timestamp is None:
timestamp = time.time()
# Perform the actual write to SmargoPolo
# pylint: disable=protected-access
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}")
# pylint: disable=attribute-defined-outside-init
old_value = self._readback
self._timestamp = timestamp
self._readback = r[self.addr.upper()]
self._value = r[self.addr.upper()]
# Notify subscribers
self._run_subs(
sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp
)
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
"""Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
raise ValueError(f"Target {value} outside of limits {self.limits}")
hil = self.limits[1]
if hil is not None:
if value > hil:
raise ValueError(f"Target {value} outside of limits {self.limits}")
def get(self, **kwargs):
# pylint: disable=protected-access
r = self.parent._go_n_get(self.write_addr)
# pylint: disable=attribute-defined-outside-init
self._value = r[self.addr.upper()] if isinstance(r, dict) else r
return super().get(**kwargs)
class SmarGonSignalRO(Signal):
"""Small helper class for read-only parameters PVs from SmarGon.
Reads and optionally monitors a variable on the SmarGon.
"""
def __init__(self, *args, read_addr="readbackSCS", auto_monitor=False, **kwargs):
super().__init__(*args, **kwargs)
self._metadata["write_access"] = False
self.read_addr = read_addr
self.addr = self.parent.name
if auto_monitor:
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def get(self, **kwargs):
# pylint: disable=protected-access
r = self.parent._go_n_get(self.read_addr)
if isinstance(r, dict):
self.put(r[self.addr.upper()], force=True)
else:
self.put(r, force=True)
return self._readback
def poll(self):
"""Fooo"""
time.sleep(2)
while True:
time.sleep(0.25)
try:
self.get()
except requests.ConnectTimeout as ex:
logger.error(f"[{self.name}] {ex}")
class SmarGonAxis(PVPositioner):
"""SmarGon client deice
This class controls the SmarGon goniometer via the REST interface. All
SmarGon axes share a common mutex to manage actual HW access.
"""
USER_ACCESS = ["omove"]
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
corr = Component(SmarGonSignalRO, read_addr="corr_type", kind=Kind.config)
mode = Component(SmarGonSignalRO, read_addr="mode", kind=Kind.config)
# Axis parameters
readback = Component(SmarGonSignalRO, kind=Kind.hinted, auto_monitor=True)
setpoint = Component(SmarGonSignal, kind=Kind.normal)
done = Component(Signal, value=1, kind=Kind.normal)
# moving = Component(SmarGonMovingSignalRO, kind=Kind.config)
_tol = 0.001
# pylint: disable=too-many-arguments
def __init__(
self,
prefix="SCS",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit=None,
high_limit=None,
**kwargs,
) -> None:
self.__class__.__dict__["readback"].kwargs["read_addr"] = f"readback{prefix}"
self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}"
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
self.__class__.__dict__["sg_url"].kwargs["value"] = sg_url
# Fine-tune HTTP connection behavior
# NOTE: SmarGon has a few failed requests every one in a while
self._s = requests.Session()
retries = Retry(total=5, backoff_factor=0.05, status_forcelist=[500, 502, 503, 504])
self._s.mount("http://", HTTPAdapter(max_retries=retries))
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
def initialize(self):
"""Helper function for initial readings"""
# self.corr.get()
# self.mode.get()
r = self._go_n_get("corr_type")
print(r)
def move(self, position, wait=True, timeout=None, moved_cb=None):
"""Move command that's masked by BEC"""
return self.omove(position, wait, timeout, moved_cb)
def omove(self, position, wait=True, timeout=2.0, moved_cb=None):
"""Original move command without the BEC wrappers"""
status = self.setpoint.set(position, settle_time=0.1)
status.wait()
if not wait:
return status
def on_target(*, value, **_):
distance = abs(value - self.setpoint._value)
print(f"[self.name] Distance: {distance}")
return bool(distance < self._tol)
status = SubscriptionStatus(self.readback, on_target, timeout=timeout, settle_time=0.1)
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def _pos_changed(self, timestamp=None, value=None, **kwargs):
pass
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.get(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error getting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.put(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error putting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
if __name__ == "__main__":
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(
prefix="SCS",
name="shz",
low_limit=10,
high_limit=22,
sg_url="http://x06da-smargopolo.psi.ch:3000",
)
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()

View File

@@ -0,0 +1,249 @@
"""
``SmarGon`` --- SmarGon control software
******************************************
The module provides an object to control the SmarGon goniometer axes at PX III.
The SmarGon axes are interfaced as positioners.
"""
import time
import threading
from collections import OrderedDict
import requests
from requests.adapters import HTTPAdapter, Retry
from ophyd import Component, Kind, Signal, PVPositioner
from ophyd.status import SubscriptionStatus
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("SmarGon")
# SmarGon contoller can't really handle multiple connections
# Use this mutex to ensure one access at a time
mutex = threading.Lock()
class LimitedSmarGonSignal(Signal):
"""SmarGonSignal (R/W)
Small helper class to read/write parameters from SmarGon. As there is no
motion status readback from smargopolo, this should be substituted with
setting with 'settle_time'.
"""
def __init__(self, *args, write_addr="targetSCS", low_limit=None, high_limit=None, **kwargs):
self._limits = (low_limit, high_limit)
super().__init__(*args, **kwargs)
self.write_addr = write_addr
@property
def limits(self):
return self._limits
def check_value(self, value, **kwargs):
"""Check if value falls within limits"""
lol = self.limits[0]
if lol is not None:
if value < lol:
raise ValueError(f"Target {value} outside of limits {self.limits}")
hil = self.limits[1]
if hil is not None:
if value > hil:
raise ValueError(f"Target {value} outside of limits {self.limits}")
def put(self, value, *, timestamp=None, force=False, metadata=None, **kwargs):
"""Overriden put to add communication with smargopolo"""
# Validate new value and get timestamp
if not force:
self.check_value(value)
if timestamp is None:
timestamp = time.time()
# Perform the actual write to SmargoPolo
# pylint: disable=protected-access
r = self.parent._go_n_put(f"{self.write_addr}?{self.parent.name.upper()}={value}")
# pylint: disable=attribute-defined-outside-init
old_value = self._readback
self._timestamp = timestamp
self._readback = r[self.parent.name.upper()]
self._value = r[self.parent.name.upper()]
# Notify subscribers
self._run_subs(
sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp
)
class SmarGonAxis(PVPositioner):
"""SmarGon client deice
This class controls the SmarGon goniometer via the REST interface. All
SmarGon axes share a common mutex to manage actual HW access.
"""
USER_ACCESS = ["omove", "oldmove"]
# Status attributes
sg_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
# Axis parameters
readback = Component(Signal, kind=Kind.hinted, metadata={"write_access": False})
setpoint = Component(LimitedSmarGonSignal, kind=Kind.normal)
done = Component(Signal, value=1, kind=Kind.normal, metadata={"write_access": False})
_tol = 0.001
# pylint: disable=too-many-arguments
def __init__(
self,
prefix="SCS",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
sg_url: str = "http://x06da-smargopolo.psi.ch:3000",
low_limit=None,
high_limit=None,
**kwargs,
) -> None:
# self.__class__.__dict__["setpoint"].kwargs["write_addr"] = f"target{prefix}"
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
self.__class__.__dict__["sg_url"].kwargs["value"] = sg_url
# Fine-tune HTTP connection behavior
# NOTE: SmarGon has a few failed requests every one in a while
self._s = requests.Session()
retries = Retry(total=5, backoff_factor=0.05, status_forcelist=[500, 502, 503, 504])
self._s.mount("http://", HTTPAdapter(max_retries=retries))
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
def on_target():
"""Monitors the setpoint and readback and calculates the on_target flag"""
time.sleep(2)
while True:
# Read back target and setpoint values
# pylint: disable=protected-access
r = self._go_n_get("readbackSCS")
rb = r[self.name.upper()]
self.readback.set(rb, force=True).wait()
r = self._go_n_get("targetSCS")
sp = r[self.name.upper()]
self.setpoint._value = sp
# print(f"Readback: {rb}\tSetpoint: {sp}")
# Check if they're within tolerance
distance = abs(rb - sp)
done = 1 if distance < self._tol else 0
self.done.put(done, force=True)
time.sleep(0.2)
self._mon = threading.Thread(target=on_target, daemon=True)
self._mon.start()
def omove(self, position, wait=True, timeout=None, moved_cb=None):
"""Move command that's masked by BEC"""
self.done.put(0, force=True)
return self.move(position, wait, timeout, moved_cb)
def oldmove(self, position, wait=True, timeout=2.0, moved_cb=None):
"""Original move command without the BEC wrappers"""
status = self.setpoint.set(position, settle_time=0.1).wait()
if not wait:
return status
def on_target(*, value, **_):
distance = abs(value - self.setpoint._value)
print(f"[self.name] Distance: {distance}")
return bool(distance < self._tol)
status = SubscriptionStatus(self.readback, on_target, timeout=timeout, settle_time=0.1)
return status
def describe(self):
"""Workaround to schema expected by the BEC"""
d = super().describe()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def read(self) -> OrderedDict[str, dict]:
"""Workaround to schema expected by the BEC"""
d = super().read()
d[str(self.name)] = d[f"{self.name}_readback"]
return d
def _pos_changed(self, timestamp=None, value=None, **kwargs):
"""Remove EPICS dependency"""
pass
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.get(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.get(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error getting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
try:
with mutex:
r = self._s.put(cmd, timeout=1, **kwargs)
except TimeoutError:
try:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
except TimeoutError:
time.sleep(0.05)
with mutex:
r = self._s.put(cmd, timeout=0.5, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error putting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
if __name__ == "__main__":
shx = SmarGonAxis(prefix="SCS", name="shx", sg_url="http://x06da-smargopolo.psi.ch:3000")
shy = SmarGonAxis(prefix="SCS", name="shy", sg_url="http://x06da-smargopolo.psi.ch:3000")
shz = SmarGonAxis(
prefix="SCS",
name="shz",
low_limit=10,
high_limit=22,
sg_url="http://x06da-smargopolo.psi.ch:3000",
)
shx.wait_for_connection()
shy.wait_for_connection()
shz.wait_for_connection()

View File

@@ -0,0 +1,393 @@
# #!/usr/bin/env python3
# from time import sleep, time
# from typing import Tuple
# from requests import get, put
# from beamline import beamline
# from mx_redis import SMARGON
# try:
# from mx_preferences import get_config
# host = get_config(beamline)["smargon"]["host"]
# port = get_config(beamline)["smargon"]["port"]
# except Exception:
# host = "x06da-smargopolo.psi.ch"
# port = 3000
# base = f"http://{host}:{port}"
# def gonget(thing: str, **kwargs) -> dict:
# """issue a GET for some API component on the smargopolo server"""
# cmd = f"{base}/{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# r = get(cmd)
# if not r.ok:
# raise Exception(f"error getting {thing}; server returned {r.status_code} => {r.reason}")
# return r.json()
# def gonput(thing: str, **kwargs):
# """issue a PUT for some API component on the smargopolo server"""
# cmd = f"{base}/{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def scsput(**kwargs):
# """
# Issue a new absolute target in the SH coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
# :type kwargs: dict
# :return:
# :rtype:
# """
# xyz = {
# k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
# }
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetSCS?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def bcsput(**kwargs):
# """
# Issue a new absolute target in the beamline coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("bx", "by", "bz", "chi", "phi")
# :return:
# :rtype:
# """
# xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz", "chi", "phi")}
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetBCS?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def scsrelput(**kwargs) -> None:
# """
# Issue relative increments to current SH coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("shx", "shy", "shz", "chi", "phi")
# :type kwargs: dict
# :return:
# :rtype:
# """
# xyz = {
# k.upper(): v for k, v in kwargs.items() if k.lower() in ("shx", "shy", "shz", "chi", "phi")
# }
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetSCS_rel?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# def bcsrelput(**kwargs):
# """
# Issue relative increments to current beamline coordinate system.
# The key "verbose" may be passed in kwargs with any true
# value for verbose behaviour.
# :param kwargs: a dict containing keys ("bx", "by", "bz")
# :type kwargs: dict
# :return:
# :rtype:
# """
# xyz = {k.upper(): v for k, v in kwargs.items() if k.lower() in ("bx", "by", "bz")}
# thing = "&".join([f"{k.upper()}={float(v):.5f}" for k, v in xyz.items()])
# cmd = f"{base}/targetBCS_rel?{thing}"
# if kwargs.get("verbose", False):
# print(cmd)
# put(cmd)
# # url_redis = f"{beamline}-cons-705.psi.ch"
# # print(f"connecting to redis DB #3 on host: {url_redis}")
# # redis_handle = redis.StrictRedis(host=url_redis, db=3)
# # pubsub = redis_handle.pubsub()
# MODE_UNINITIALIZED = 0
# MODE_INITIALIZING = 1
# MODE_READY = 2
# MODE_ERROR = 99
# class SmarGon(object):
# def __init__(self):
# super(SmarGon, self).__init__()
# self.__dict__.update(target=None)
# self.__dict__.update(bookmarks={})
# self.__dict__.update(_latest_message={})
# # pubsub.psubscribe(**{f"__keyspace@{SMARGON.value}__:*": self._cb_readbackSCS})
# # pubsub.run_in_thread(sleep_time=0.5, daemon=True)
# def __repr__(self):
# BX, BY, BZ, OMEGA, CHI, PHI, a, b, c = self.readback_bcs().values()
# return f"<{self.__class__.__name__} X={BX:.3f}, Y={BY:.3f}, Z={BZ:.3f}, CHI={CHI:.3f}, PHI={PHI:.3f}, OMEGA={OMEGA:.3f}>"
# def _cb_readbackSCS(self, msg):
# if msg["data"] in ["hset"]:
# self._latest_message = msg
# def move_home(self, wait=False) -> None:
# """move to beamline coordinate system X, Y, Z, Chi, Phi = 0 0 0 0 0"""
# self.apply_bookmark_sh({"shx": 0.0, "shy": 0.0, "shz": 18.0, "chi": 0.0, "phi": 0.0})
# if wait:
# self.wait_home()
# def xyz(self, coords: Tuple[float, float, float], wait: bool = True) -> None:
# """
# Move smargon in absolute beamline coordinates
# :param coords: a tuple of floats representing X, Y, Z coordinates
# :type coords:
# :param wait:
# :type wait:
# :return:
# :rtype:
# """
# x, y, z = coords
# # the two steps below are necessary otherwise the control system
# # remembers *a* previous CHI
# bcs = self.bcs
# bcs.update({"BX": x, "BY": y, "BZ": z})
# self.bcs = bcs
# if wait:
# self.wait()
# def wait_home(self, timeout: float = 20.0) -> None:
# """
# wait for the smargon to reach its home position:
# SHX = 0.0
# SHY = 0.0
# SHZ = 18.0
# CHI = 0.0
# PHI = 0.0
# :param timeout: time to wait for positions to be reached raises TimeoutError if timeout reached
# :type timeout: float
# :return:
# :rtype:
# """
# tout = timeout + time()
# in_place = [False, False]
# rbv = -999.0
# while not all(in_place) and time() < tout:
# rbv = self.readback_scs()
# in_place = []
# for k, v in {"SHX": 0.0, "SHY": 0.0, "SHZ": 18.0, "CHI": 0.0, "PHI": 0.0}.items():
# in_place.append(abs(rbv[k] - v) < 0.01)
# if time() > tout:
# raise TimeoutError(f"timeout waiting for smargon to reach home position: {rbv}")
# def push_bookmark(self):
# """
# save current absolute coordinates in FIFO stack
# :return:
# :rtype:
# """
# t = round(time())
# self.bookmarks[t] = self.readback_scs()
# def pop_bookmark(self):
# return self.bookmarks.popitem()[1]
# def apply_bookmark_sh(self, scs):
# scsput(**scs)
# def apply_last_bookmark_sh(self):
# scs = self.pop_bookmark()
# scsput(**scs)
# def readback_mcs(self):
# """current motor positions of the smargon sliders"""
# return gonget("readbackMCS")
# def readback_scs(self):
# """current SH coordinates of the smargon model"""
# return gonget("readbackSCS")
# def readback_bcs(self):
# """current beamline coordinates of the smargon"""
# return gonget("readbackBCS")
# def target_scs(self):
# """currently assigned targets for the smargon control system"""
# return gonget("targetSCS")
# def initialize(self):
# """initialize the smargon"""
# self.set_mode(MODE_UNINITIALIZED)
# sleep(0.1)
# self.set_mode(MODE_INITIALIZING)
# def set_mode(self, mode: int):
# """put smargon control system in a given mode
# MODE_UNINITIALIZED = 0
# MODE_INITIALIZING = 1
# MODE_READY = 2
# MODE_ERROR = 99
# """
# gonput(f"mode?mode={mode}")
# def enable_correction(self):
# """enable calibration based corrections"""
# gonput("corr_type?corr_type=1")
# def disable_correction(self):
# """disable calibration based corrections"""
# gonput("corr_type?corr_type=0")
# def chi(self, val=None, wait=False):
# if val is None:
# return self.readback_scs()["CHI"]
# scsput(CHI=val)
# if wait:
# timeout = 10 + time()
# while time() < timeout:
# if abs(val - self.readback_scs()["CHI"]) < 0.1:
# break
# if time() > timeout:
# raise RuntimeError(f"SmarGon CHI did not reach requested target {val} in time")
# def phi(self, val=None, wait=False):
# if val is None:
# return self.readback_scs()["PHI"]
# scsput(PHI=val)
# if wait:
# timeout = 70 + time()
# while time() < timeout:
# if abs(val - self.readback_scs()["PHI"]) < 0.1:
# break
# if time() > timeout:
# raise RuntimeError(f"SmarGon PHI did not reach requested target {val} in time")
# def wait(self, timeout=60.0):
# """waits up to `timeout` seconds for smargon to reach target"""
# target = {
# k.upper(): v
# for k, v in self.target_scs().items()
# if k.lower() in ("shx", "shy", "shz", "chi", "phi")
# }
# timeout = timeout + time()
# while time() < timeout:
# s = {
# k: (abs(v - target[k]) < 0.01)
# for k, v in self.readback_scs().items()
# if k.upper() in ("SHX", "SHY", "SHZ", "CHI", "PHI")
# }
# if all(list(s.values())):
# break
# if time() > timeout:
# raise TimeoutError("timed out waiting for smargon to reach target")
# def __setattr__(self, key, value):
# key = key.lower()
# if key == "mode":
# self.set_mode(value)
# elif key == "correction":
# assert value in (
# 0,
# 1,
# False,
# True,
# ), "correction is either 1 or True (enabled) or 0 (disabled)"
# gonput(f"corr_type?corr_type?{value}")
# elif key == "scs":
# scsput(**value)
# elif key == "bcs":
# bcsput(**value)
# elif key == "target":
# if not isinstance(value, dict):
# raise Exception(
# f"expected a dict with target axis and values got something else: {value}"
# )
# for k in value.keys():
# if k.lower() not in "shx shy shz chi phi ox oy oz".split():
# raise Exception(f'unknown axis in target "{k}"')
# scsput(**value)
# elif key in "shx shy shz chi phi ox oy oz".split():
# scsput(**{key: value})
# elif key in "bx by bz".split():
# bcs = self.readback_bcs()
# bcs[key] = value
# bcsput(**bcs)
# else:
# self.__dict__[key].update(value)
# def __getattr__(self, key):
# key = key.lower()
# if key == "mode":
# return self.readback_mcs()["mode"]
# elif key == "correction":
# return gonget("corr_type")
# elif key == "bcs":
# return self.readback_bcs()
# elif key == "mcs":
# return self.readback_mcs()
# elif key == "scs":
# return self.readback_scs()
# elif key in "shx shy shz chi phi ox oy oz".split():
# return self.readback_scs()[key.upper()]
# elif key in "bx by bz".split():
# return self.readback_bcs()[key.upper()]
# else:
# return self.__getattribute__(key)
# if __name__ == "__main__":
# import argparse
# parser = argparse.ArgumentParser(description="SmarGon client")
# parser.add_argument("-i", "--initialize", help="initialize smargon", action="store_true")
# args = parser.parse_args()
# smargon = SmarGon()
# if args.initialize:
# print("initializing smargon device")
# import Aerotech
# print("moving aerotech back by 50mm")
# abr = Aerotech.Abr()
# abr.incr_x(-50.0, wait=True, velo=100.0)
# print("issuing init command to smargon")
# smargon.initialize()
# sleep(0.5)
# print("waiting for init routine to complete")
# while MODE_READY != smargon.mode:
# sleep(0.5)
# print("moving smargon to HOME position")
# smargon.move_home()
# print("moving aerotech to its previous position")
# abr.incr_x(50.0, wait=True, velo=100.0)
# exit(0)

View File

@@ -0,0 +1,206 @@
# -*- coding: utf-8 -*-
"""
Standard DAQ preview image stream module
Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
import json
from time import sleep, time
from threading import Thread
import zmq
import numpy as np
from ophyd import Device, Signal, Component, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from bec_lib import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
class StdDaqPreviewMixin(CustomDetectorMixin):
"""Setup class for the standard DAQ preview stream
Parent class: CustomDetectorMixin
"""
_mon = None
def on_stage(self):
"""Start listening for preview data stream"""
if self._mon is not None:
self.parent.unstage()
sleep(0.5)
logger.info(f"[{self.parent.name}] Attaching monitor to {self.parent.url.get()}")
self.parent.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def on_unstage(self):
"""Stop a running preview"""
if self._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self._mon.join(timeout=1)
# So also disconnect the socket
try:
# pylint: disable=protected-access
self.parent._socket.disconnect(self.parent.url.get())
except zmq.error.ZMQError:
# Might be already closed
pass
def on_stop(self):
"""Stop a running preview"""
self.on_unstage()
def poll(self):
"""Collect streamed updates"""
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
logger.info(f"[{self.parent.name}]\tDetaching monitor")
break
# pylint: disable=no-member
# pylint: disable=protected-access
r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
if len(r) != 2:
logger.warning(
f"[{self.parent.name}] Received malformed array of length {len(r)}"
)
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.parent.throttle.get():
sleep(0.1)
continue
# Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
image = np.frombuffer(data, dtype=header["type"])
if image.size != np.prod(header["shape"]):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header["shape"])
# Update image and update subscribers
self.parent.array_counter.put(header["frame"], force=True)
self.parent.ndimensions.put(len(header["shape"]), force=True)
self.parent.array_size.put(header["shape"], force=True)
# self.parent.array_data.put(data, force=True)
self.parent.shaped_image.put(image, force=True)
# pylint: disable=protected-access
self.parent._last_image = image
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last = t_curr
logger.info(
f"[{self.parent.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.parent.name}]\t{str(ex)}")
raise
finally:
self._mon = None
logger.info(f"[{self.parent.name}]\tDetaching monitor")
class StdDaqPreviewDetector(PSIDetectorBase):
"""Detector wrapper class around the StdDaq preview image stream.
This was meant to provide live image stream directly from the StdDAQ but
also works with other ARRAY v1 streamers, like the AreaDetector ZMQ plugin.
Note that the preview stream must be already throtled in order to cope with
the incoming data and the python class might throttle it further.
NOTE: As an explicit request, it does not record the image data.
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# Subscriptions for plotting image
USER_ACCESS = ["image", "savemode"]
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
custom_prepare_cls = StdDaqPreviewMixin
# Configuration attributes
url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
throttle = Component(Signal, value=0.25, kind=Kind.config)
# Streamed data status
array_counter = Component(Signal, kind=Kind.hinted, metadata={"write_access": False})
ndimensions = Component(Signal, kind=Kind.normal, metadata={"write_access": False})
array_size = Component(Signal, kind=Kind.normal, metadata={"write_access": False})
# array_data = Component(Signal, kind=Kind.omitted, metadata={"write_access": False})
shaped_image = Component(Signal, kind=Kind.omitted, metadata={"write_access": False})
_last_image = None
def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.url.set(url, force=True).wait()
# Connect to the DAQ
self.connect()
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface
StdDAQ may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
# Socket to talk to server
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url.get())
except ConnectionRefusedError:
sleep(1)
self._socket.connect(self.url.get())
def savemode(self, save=False):
"""Toggle save mode for the shaped image"""
# pylint: disable=protected-access
if save:
self.shaped_image._kind = Kind.normal
else:
self.shaped_image._kind = Kind.omitted
def image(self):
"""
Gets the last image as an attribute in case image must be abandoned
due to some caching on the BEC.
"""
return self._last_image
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
daq = StdDaqPreviewDetector(url="tcp://129.129.95.111:20000", name="preview")
daq.wait_for_connection()

View File

@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
"""
Ophyd devices for the PX III beamline, including the MX specific Aerotech A3200 stage.
@author: mohacsi_i
"""
from .A3200 import AerotechAbrStage
from .A3200utils import A3200Axis
from .SmarGonA import SmarGonAxis as SmarGonAxisA
from .SmarGonB import SmarGonAxis as SmarGonAxisB
from .StdDaqPreview import StdDaqPreviewDetector
from .NDArrayPreview import NDArrayPreview
from .SamCamDetector import SamCamDetector
from .PneumaticValve import PneumaticValve

View File

@@ -3,3 +3,9 @@
### pxiii_bec
| Device | Documentation | Module |
| :----- | :------------- | :------ |
| A3200Axis | Positioner wrapper for A3200 axes<br><br><br> Positioner wrapper for motors on the Aerotech A3200 controller. As the IOC<br> does not provide a motor record, this class simply wraps axes into a<br> standard Ophyd positioner for the BEC. It also has some additional<br> functionality for error checking and diagnostics.<br><br> Examples<br> --------<br> omega = A3200Axis('X06DA-ES-DF1:OMEGA', base_pv='X06DA-ES')<br><br> Parameters<br> ----------<br> prefix : str<br> Axis PV name root.<br> base_pv : str (situational)<br> IOC PV name root, i.e. X06DA-ES if standalone class.<br> | [pxiii_bec.devices.A3200utils](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/A3200utils.py) |
| AerotechAbrStage | Standard PX stage on A3200 controller<br><br> This is the wrapper class for the standard rotation stage layout for the PX<br> beamlines at SLS. It wraps the main rotation axis OMEGA (Aerotech ABR)and<br> the associated motion axes GMX, GMY and GMZ. The ophyd class associates to<br> the general PX measurement procedure, which is that the actual scan script<br> is running as an AeroBasic program on the controller and we communicate to<br> it via 10+1 global variables.<br> | [pxiii_bec.devices.A3200](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/A3200.py) |
| NDArrayPreview | Wrapper class around AreaDetector's NDStdArray plugins<br><br> This is a standalone class to display images from AreaDetector's<br> ImagePlugin without using a parent device. It also offers BEC exposed<br> methods to transfer image and change image array Kind-ness.<br><br> NOTE: As an explicit request, it can toggle data recording<br> | [pxiii_bec.devices.NDArrayPreview](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/NDArrayPreview.py) |
| SamCamDetector | Sample camera device<br><br> The SAMCAM continously streams images to the GUI and sample alignment<br> scripts via ZMQ.<br> | [pxiii_bec.devices.SamCamDetector](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/SamCamDetector.py) |
| SmarGonAxis | SmarGon client deice<br><br> This class controls the SmarGon goniometer via the REST interface. All<br> SmarGon axes share a common mutex to manage actual HW access.<br> | [pxiii_bec.devices.SmarGonB](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/SmarGonB.py) |
| StdDaqPreviewDetector | Detector wrapper class around the StdDaq preview image stream.<br><br> This was meant to provide live image stream directly from the StdDAQ but<br> also works with other ARRAY v1 streamers, like the AreaDetector ZMQ plugin.<br> Note that the preview stream must be already throtled in order to cope with<br> the incoming data and the python class might throttle it further.<br><br> NOTE: As an explicit request, it does not record the image data.<br><br> You can add a preview widget to the dock by:<br> cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')<br> | [pxiii_bec.devices.StdDaqPreview](https://gitlab.psi.ch/bec/pxiii_bec/-/blob/main/pxiii_bec/devices/StdDaqPreview.py) |

View File

@@ -1 +1,8 @@
from .my_test_scan import MyScan
from .mx_measurements import (
MeasureStandardWedge,
MeasureVerticalLine,
MeasureRasterSimple,
MeasureScreening,
MeasureHelical,
MeasureHelical2,
)

View File

@@ -0,0 +1,12 @@
# from .metadata_schema_template import ExampleSchema
METADATA_SCHEMA_REGISTRY = {
# Add models which should be used to validate scan metadata here.
# Make a model according to the template, and import it as above
# Then associate it with a scan like so:
# "example_scan": ExampleSchema
}
# Define a default schema type which should be used as the fallback for everything:
DEFAULT_SCHEMA = None

View File

@@ -0,0 +1,34 @@
# # By inheriting from BasicScanMetadata you can define a schema by which metadata
# # supplied to a scan must be validated.
# # This schema is a Pydantic model: https://docs.pydantic.dev/latest/concepts/models/
# # but by default it will still allow you to add any arbitrary information to it.
# # That is to say, when you run a scan with which such a model has been associated in the
# # metadata_schema_registry, you can supply any python dictionary with strings as keys
# # and built-in python types (strings, integers, floats) as values, and these will be
# # added to the experiment metadata, but it *must* contain the keys and values of the
# # types defined in the schema class.
# #
# #
# # For example, say that you would like to enforce recording information about sample
# # pretreatment, you could define the following:
# #
#
# from bec_lib.metadata_schema import BasicScanMetadata
#
#
# class ExampleSchema(BasicScanMetadata):
# treatment_description: str
# treatment_temperature_k: int
#
#
# # If this was used according to the example in metadata_schema_registry.py,
# # then when calling the scan, the user would need to write something like:
# >>> scans.example_scan(
# >>> motor,
# >>> 1,
# >>> 2,
# >>> 3,
# >>> metadata={"treatment_description": "oven overnight", "treatment_temperature_k": 575},
# >>> )
#
# # And the additional metadata would be saved in the HDF5 file created for the scan.

View File

@@ -0,0 +1,473 @@
"""MX measurements module
Scan primitives for standard BEC scans at the PX beamlines at SLS.
Theese scans define the event model and can be called from higher levels.
"""
import time
import numpy as np
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
class AbrCmd:
"""Valid Aerotech ABR scan commands from the AeroBasic files"""
NONE = 0
RASTER_SCAN_SIMPLE = 1
MEASURE_STANDARD = 2
VERTICAL_LINE_SCAN = 3
SCREENING = 4
# SUPER_FAST_OMEGA = 5 # Some Japanese measured samples in capilaries at high RPMs
# STILL_WEDGE = 6 # NOTE: Unused Step scan
# STILLS = 7 # NOTE: Unused Just send triggers to detector
# REPEAT_SINGLE_OSCILLATION = 8 # NOTE: Unused
# SINGLE_OSCILLATION = 9
# OLD_FASHIONED = 10 # NOTE: Unused
# RASTER_SCAN = 11
# JET_ROTATION = 12 # NOTE: Unused
# X_HELICAL = 13 # NOTE: Unused
# X_RUNSEQ = 14 # NOTE: Unused
# JUNGFRAU = 15
# MSOX = 16 # NOTE: Unused
# SLIT_SCAN = 17 # NOTE: Unused
# RASTER_SCAN_STILL = 18
# SCAN_SASTT = 19
# SCAN_SASTT_V2 = 20
# SCAN_SASTT_V3 = 21
class AerotechFlyscanBase(AsyncFlyScanBase):
"""Base class for MX flyscans
Low-level base class for standard scans at the PX beamlines at SLS. Theese
scans use the A3200 rotation stage and the actual experiment is performed
using an AeroBasic script controlled via global variables. The base class
has some basic safety features like checking status then sets globals and
fires off the scan. Implementations can choose to set the corresponding
configurations in child classes or pass it as command line parameters.
IMPORTANT: The AeroBasic scripts take care of the PSO configuration.
Parameters:
-----------
abr_complete : bool
Wait for the launched ABR task to complete.
"""
scan_type = "fly"
scan_report_hint = "table"
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
# Aerotech stage config
abr_raster_reset = False
abr_complete = False
abr_timeout = None
pointID = 0
num_pos = 0
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
parameters.
"""
super().__init__(parameter=parameter, **kwargs)
if "abr_raster_reset" in self.caller_kwargs:
self.abr_raster_reset = self.caller_kwargs.get("abr_raster_reset")
if "abr_complete" in self.caller_kwargs:
self.abr_complete = self.caller_kwargs.get("abr_complete")
if "abr_timeout" in self.caller_kwargs:
self.abr_timeout = self.caller_kwargs.get("abr_timeout")
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# TODO: Move roughly to start position???
# ABR status checking
stat = yield from self.stubs.send_rpc_and_wait("abr", "status.get")
if stat not in (0, "OK"):
raise RuntimeError("Aerotech ABR seems to be in error state {stat}, please reset")
task = yield from self.stubs.send_rpc_and_wait("abr", "task1.get")
# From what I got values are copied to local vars at the start of scan,
# so only kickoff should be forbidden.
if task not in (1, "OK"):
raise RuntimeError("Aerotech ABR task #1 seems to busy")
# Reset the raster scan engine
if self.abr_raster_reset:
yield from self.stubs.send_rpc_and_wait("abr", "raster_scan_done.set", 0)
# Call super
yield from super().pre_scan()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "bluekickoff")
logger.info("Measurement launched on the ABR stage...")
# Wait for grid scanner to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()
def cleanup(self):
"""Set scan progress to 1 to finish the scan"""
self.num_pos = 1
return super().cleanup()
class MeasureStandardWedge(AerotechFlyscanBase):
"""Standard wedge scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : (float, float)
Scan start position of the axis.
range : (float, float)
Scan range of the axis.
move_time : (float)
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
"""
scan_name = "standardscan"
required_kwargs = ["start", "range", "move_time"]
class MeasureVerticalLine(AerotechFlyscanBase):
"""Vertical line scan using the GMY motor
Simple relative continous line scan that records a single vertical line
with PSO output. There's no actual stepping, it's only used for velocity
calculation.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_vline(range_y=12, steps_y=40, exp_time=0.1)
Parameters
----------
range : float
Step size [mm].
steps : int
Scan range of the axis.
exp_time : float
Eeffective exposure time per step [s]
"""
scan_name = "vlinescan"
required_kwargs = ["exp_time", "range", "steps"]
class MeasureRasterSimple(AerotechFlyscanBase):
"""Simple raster scan
Measure a simplified relative zigzag raster scan in the X-Y plane.
The scan is relative assumes the goniometer is currently at the CENTER of
the first cell (i.e. TOP-LEFT). Each line is executed as a single continous
movement, i.e. there's no actual stepping in the X direction.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.raster_simple(exp_time=0.1, range_x=4, range_y=4, steps_x=80, steps_y=80)
Parameters
----------
exp_time : float
Effective exposure time for each cell along the X axis [s].
range_x : float
Scan step size [mm].
range_y : float
Scan step size [mm].
steps_x : int
Number of scan steps in X (fast). Only used for velocity calculation.
steps_y : int
Number of scan steps in Y (slow).
"""
scan_name = "rasterscan"
required_kwargs = ["exp_time", "range_x", "range_y", "steps_x", "steps_y"]
class MeasureScreening(AerotechFlyscanBase):
"""Sample screening scan
Sample screening scan that scans intervals on the rotation axis taking
1 image/interval. This makes sure that we hit diffraction peaks if there
are any crystals.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.measure_screening(start=42, range=180, steps=18, exp_time=0.1, oscrange=2.0)
Parameters
----------
start : float
Absolute scan start position of the omega axis [deg].
range : float
Total screened range of the omega axis relative to 'start' [deg].
steps : int
Number of blurred intervals.
exp_time : float
Exposure time per blurred interval [s].
oscrange : float
Motion blurring of each interval [deg]
delta : float
Safety margin for sub-range movements (default: 0.5) [deg].
"""
scan_name = "screeningscan"
required_kwargs = ["start", "range", "steps", "exp_time", "oscrange"]
class MeasureHelical(AerotechFlyscanBase):
"""Helical scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : float
Scan start position of the axis.
range : float
Scan range of the axis.
move_time : float
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
sg_start : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_end : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_steps : int
Number of steps with SmarGon.
"""
scan_name = "helicalscan"
required_kwargs = ["start", "range", "move_time", "sg_start", "sg_end", "sg_steps"]
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# Smargon has no velocity control
self.smargon_start = np.array(self.caller_kwargs.get("sg_start"))
self.smargon_end = np.array(self.caller_kwargs.get("sg_end"))
self.smargon_steps = self.caller_kwargs.get("sg_steps")
self.smargon_range = self.smargon_end - self.smargon_start
self.smargon_step_size = self.smargon_range / self.smargon_steps
self.smargon_step_time = self.caller_kwargs.get("move_time") / self.smargon_steps
logger.info(f"Start:\t{self.smargon_start}")
logger.info(f"End:\t{self.smargon_end}")
logger.info(f"Steps:\t{self.smargon_steps}")
logger.info(f"Range:\t{self.smargon_range}")
logger.info(f"StepSize:\t{self.smargon_step_size}")
logger.info(f"StepTime:\t{self.smargon_step_time}")
# TODO: Move roughly to start position???
st0 = yield from self.stubs.send_rpc("shx", "omove", self.smargon_start[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", self.smargon_start[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", self.smargon_start[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", self.smargon_start[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", self.smargon_start[4])
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
# Call super
yield from super().pre_scan()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "kickoff")
logger.info("Measurement launched on the ABR stage...")
logger.info("Performing SmarGon stepping...")
for ss in range(self.smargon_steps):
sg_pos = self.smargon_start + ss * self.smargon_step_size
# Move to position but don't care
st0 = yield from self.stubs.send_rpc("shx", "omove", sg_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", sg_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", sg_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", sg_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", sg_pos[4])
t_start = time.time()
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
t_end = time.time()
t_elapsed = t_end - t_start
time.sleep(max(self.smargon_step_time - t_elapsed, 0))
# Wait for scan task to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()
class MeasureHelical2(AerotechFlyscanBase):
"""Helical scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : float
Scan start position of the axis.
range : float
Scan range of the axis.
move_time : float
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
sg_start : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_end : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_steps : int
Number of steps with SmarGon.
"""
scan_name = "helicalscan2"
required_kwargs = ["start", "range", "move_time", "sg_start", "sg_end", "sg_steps"]
point_id = 0
# def __init__(self, *args, parameter: dict = None, **kwargs):
# """Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
# parameters.
# """
# self.num_pos = kwargs["sg_steps"]
# super().__init__(*args, parameter=parameter, **kwargs)
def prepare_positions(self):
# Smargon has no velocity control
self.smargon_start = np.array(self.caller_kwargs.get("sg_start"))
self.smargon_end = np.array(self.caller_kwargs.get("sg_end"))
self.smargon_steps = self.caller_kwargs.get("sg_steps")
self.smargon_range = self.smargon_end - self.smargon_start
self.smargon_step_size = self.smargon_range / self.smargon_steps
self.smargon_step_time = self.caller_kwargs.get("move_time") / self.smargon_steps
logger.info(f"Start:\t{self.smargon_start}")
logger.info(f"End:\t{self.smargon_end}")
logger.info(f"Steps:\t{self.smargon_steps}")
logger.info(f"Range:\t{self.smargon_range}")
logger.info(f"StepSize:\t{self.smargon_step_size}")
logger.info(f"StepTime:\t{self.smargon_step_time}")
self.num_pos = self.smargon_steps
self.positions = np.linspace(self.smargon_start, self.smargon_end, self.smargon_steps)
self.start_pos = self.positions[0, :]
# Call super
yield from super().prepare_positions()
# def update_scan_motors(self):
# """ Update step scan motors"""
# self.scan_motors = ['shx', 'shy', 'shz', 'chi', 'phi']
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# Move roughly to start position
st0 = yield from self.stubs.send_rpc("shx", "omove", self.start_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", self.start_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", self.start_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", self.start_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", self.start_pos[4])
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
# print(f"\n\n{self.readout_priority}\n\n")
# Call super
yield from super().pre_scan()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "kickoff")
logger.info("Measurement launched on the ABR stage...")
logger.info("Performing SmarGon stepping...")
for _, sg_pos in enumerate(self.positions):
# sg_pos = self.smargon_start + ss * self.smargon_step_size
# Move to position but don't care
st0 = yield from self.stubs.send_rpc("shx", "omove", sg_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", sg_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", sg_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", sg_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", sg_pos[4])
t_start = time.time()
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
t_end = time.time()
t_elapsed = t_end - t_start
time.sleep(max(self.smargon_step_time - t_elapsed, 0))
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
# Wait for scan task to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()

View File

@@ -1,10 +0,0 @@
from bec_server.scan_server.scans import LineScan
class MyScan(LineScan):
scan_name = "test_scan"
def _calculate_positions(self):
return super()._calculate_positions()

View File

@@ -0,0 +1,49 @@
import numpy as np
from scipy.ndimage import gaussian_filter1d
from lmfit.models import GaussianModel
def alignment_fit_and_plot(
history_index: int,
device_name: str,
signal_name: str | None = None,
smoothing_sigma: float = 2.0,
):
"""
Get data for a completed scan from the BEC history, apply smoothing, gaussian fit,
gradient, and plot all the results.
Args:
history_index (int): scan to fetch, e.g. -1 for the most recent scan
device_name (str): the device for which to get the monitoring data
"""
# Fetch scan data from the history
# by default, signal = device name, unless otherwise specified
signal = signal_name or device_name
scan = bec.history[history_index]
md = scan.metadata["bec"]
data = scan.devices[device_name][signal].read()["value"]
# motor name is a bytes object in the metadata, so make a string
motor_name = md["scan_motors"][0].decode()
# Create a plot and a text box to display results
dock_area = bec.gui.new()
wf = dock_area.new().new(bec.gui.available_widgets.Waveform)
wf.title = f"Scan {md['scan_number']}: {md['scan_name']} of {motor_name}"
text = dock_area.new(position="right").new(widget=bec.gui.available_widgets.TextBox)
# Calculate some processed data and add everything to the plot
wf.plot(data, label="Raw data")
smoothed_data = gaussian_filter1d(data, smoothing_sigma)
wf.plot(smoothed_data, label="Smoothed")
gradient = np.gradient(smoothed_data)
wf.plot(gradient, label="gradient")
# Fit a Gaussian model to the smoothed data and show the fitting parameters in the textbox
x_data = scan.devices[motor_name][motor_name].read()["value"]
model = GaussianModel()
result = model.fit(smoothed_data, x=x_data)
text.set_plain_text(f"Fit parameters: \n{result.params.pretty_repr()}")
return result

View File

@@ -0,0 +1,77 @@
# pylint: disable=undefined-variable
# import bec
# import bec_lib.devicemanager.DeviceContainer as dev
import time
def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visual=True, **kwargs):
"""Demo step scan with plotting
This is a simple user-space demo step scan with the BEC. It be a
standard BEC scan, while still setting up the environment.
Example:
--------
ascan(dev.dccm_energy, 12,13, steps=21, exp_time=0.1, datasource=dev.dccm_xbpm)
"""
# Dummy method to check beamline status
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
motor = dev.dccm_theta2
if scan_start is None:
scan_start = -0.05 / dev.dccm_energy.user_readback.get()
if scan_end is None:
scan_end = 0.05 / dev.dccm_energy.user_readback.get()
if visual:
# Get or create scan specific window
window = None
for _, val in bec.gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = bec.gui.new("CurrentScan")
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=datasource)
plt1.set_x_label(motor)
plt1.set_y_label(datasource)
plt1.add_dap(motor, datasource, dap="LinearModel")
window.show()
print("Handing over to 'scans.line_scan'")
s = scans.line_scan(
motor,
scan_start,
scan_end,
steps=steps,
exp_time=exp_time,
datasource=datasource,
relative=True,
**kwargs,
)
if visual:
# If fitting via GUI
firt_par = plt1.get_dap_params()
else:
# Without GUI
firt_par = bec.dap.LinearModel.fit(
s, motor.name, motor.name, datasource.name, datasource.name
)
# TODO: Validate fitted position
# TODO: Move to fitted maximum
return s, firt_par
def monitor(device, steps, t=1):
for _ in range(steps):
print(device.read())
time.sleep(t)

30
pxiii_bec/scripts/kat.py Normal file
View File

@@ -0,0 +1,30 @@
def scan_theta2(scan_start, scan_end, stepno, exp):
# Save the motor starting position
start_value = dev.dccm_theta2.read()['dccm_theta2']['value']
print(f"Motor position is {start_value}")
# Run the scan
s = scans.line_scan(dev.dccm_theta2, scan_start, scan_end, steps=stepno, exp_time=exp, relative=True)
# data = s.devices[dccm_xbpm][dccm_xbpm].read()["value"]
# Move motor back to starting position and print XBPM reading
umv(dev.dccm_theta2, start_value)
xbpm_reading = dev.dccm_xbpm.read()['dccm_xbpm']['value']
print(f"Moving dccm_theta2 back to start position of where XBPM Reading is {xbpm_reading}")
end_value = dev.dccm_theta2.read()['dccm_theta2']['value']
print(f"Motor was at {start_value} before the scan, now at {end_value}")
# # Create a plot to display the results
dock_area = bec.gui.new()
wf = dock_area.new().new(bec.gui.available_widgets.Waveform)
wf.title = f"Scan of DCCM_theta2"
wf.plot(x_name='dccm_theta2', y_name='dccm_xbpm')
wf.add_dap_curve(device_label='dccm_xbpm-dccm_xbpm', dap_name='GaussianModel')
print(dap_xbpm.dap_params)

View File

@@ -0,0 +1,69 @@
# pylint: disable=undefined-variable
def bl_check_beam():
"""Check beamline status before scan"""
return True
def ascan(
motor, scan_start, scan_end, steps, exp_time, plot=None, visual=True, relative=False, **kwargs
):
"""Demo step scan with plotting
This is a simple user-space demo step scan with the BEC. It be a
standard BEC scan, while still setting up the environment.
Example:
--------
ascan(dev.dccm_energy, 12,13, steps=21, exp_time=0.1, datasource=dev.dccm_xbpm)
"""
# Dummy method to check beamline status
if not bl_check_beam():
raise RuntimeError("Beamline is not in ready state")
if visual:
# Get or create scan specific window
window = None
for _, val in bec.gui.windows.items():
if val.title == "CurrentScan":
window = val.widget
window.clear_all()
if window is None:
window = bec.gui.new("CurrentScan")
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=plot)
plt1.set_x_label(motor)
plt1.set_y_label(plot)
plt1.add_dap(motor, plot, dap="LinearModel")
window.show()
print("Handing over to 'scans.line_scan'")
s = scans.line_scan(
motor,
scan_start,
scan_end,
steps=steps,
exp_time=exp_time,
plot=plot,
relative=relative,
**kwargs,
)
if visual:
# Fitting via GUI
firt_par = plt1.get_dap_params()
else:
# Fitting without GUI
firt_par = bec.dap.LinearModel.fit(s, motor.name, motor.name, plot.name, plot.name)
# # Some basic fit
# dkey = datasource.full_name
# NOTE: s.scan.data == bec.history[-1]
# datapoints = bec.history[-1].devices[dkey].read()[dkey]['value']
# positions
return s, firt_par

View File

View File

@@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "pxiii_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
description = "A plugin repository for BEC"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
@@ -16,30 +16,34 @@ dependencies = [
"bec_ipython_client",
"bec_lib",
"bec_server",
"bec_widgets",
"ophyd_devices",
"std_daq_client",
"rich",
"pyepics",
"pandas~=2.0",
"matplotlib",
"zmq",
]
[project.optional-dependencies]
dev = [
"black",
"copier",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"pytest-redis",
"ophyd_devices",
"bec_server",
]
[project.entry-points."bec"]
plugin_bec = "pxiii_bec"
[project.entry-points."bec.deployment.device_server"]
plugin_ds_startup = "pxiii_bec.deployment.device_server.startup:run"
plugin_ds_startup = "pxiii_bec.deployments.device_server.startup:run"
[project.entry-points."bec.file_writer"]
plugin_file_writer = "pxiii_bec.file_writer"
@@ -47,12 +51,18 @@ plugin_file_writer = "pxiii_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "pxiii_bec.scans"
[project.entry-points."bec.scans.metadata_schema"]
plugin_metadata_schema = "pxiii_bec.scans.metadata_schema"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "pxiii_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "pxiii_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "pxiii_bec.bec_widgets.auto_updates:PlotUpdate"
plugin_widgets_update = "pxiii_bec.bec_widgets.auto_updates"
[project.entry-points."bec.widgets.user_widgets"]
plugin_widgets = "pxiii_bec.bec_widgets.widgets"
[tool.hatch.build.targets.wheel]
include = ["*"]

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -1,31 +1,34 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
BEC is using the [pytest](https://docs.pytest.org/en/latest/) framework.
It can be installed via
```bash
pip install pytest
```
in your *python environment*.
in your _python environment_.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
It is mandatory for test files to begin with `test_` for pytest to discover them.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
```bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).