Development #21

Merged
mohacsi_i merged 11 commits from development into main 2025-05-21 11:04:55 +02:00
16 changed files with 945 additions and 202 deletions

View File

@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>1801</width>
<height>1459</height>
</rect>
</property>
<property name="minimumSize">
<size>
<width>1801</width>
<height>1459</height>
</size>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QTabWidget" name="tabWidget">
<property name="currentIndex">
<number>0</number>
</property>
<widget class="QWidget" name="tab">
<attribute name="title">
<string>Control Panel</string>
</attribute>
<layout class="QGridLayout" name="gridLayout" rowstretch="3,4" columnstretch="2,5">
<item row="0" column="0">
<widget class="Waveform" name="waveform"/>
</item>
<item row="0" column="1">
<widget class="ScanControl" name="scan_control"/>
</item>
<item row="1" column="0">
<widget class="ScanHistory" name="scan_history"/>
</item>
<item row="1" column="1">
<widget class="BECQueue" name="bec_queue"/>
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_2">
<attribute name="title">
<string>Logbook</string>
</attribute>
<layout class="QGridLayout" name="gridLayout_3">
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="font">
<font>
<pointsize>24</pointsize>
</font>
</property>
<property name="text">
<string>Coming soon...</string>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_3">
<attribute name="title">
<string>Take a break</string>
</attribute>
<layout class="QGridLayout" name="gridLayout_2">
<item row="0" column="0">
<widget class="Minesweeper" name="minesweeper"/>
</item>
<item row="0" column="1">
<spacer name="horizontalSpacer">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>1073</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item row="1" column="0">
<spacer name="verticalSpacer">
<property name="orientation">
<enum>Qt::Orientation::Vertical</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>20</width>
<height>40</height>
</size>
</property>
</spacer>
</item>
</layout>
</widget>
</widget>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>ScanControl</class>
<extends>QWidget</extends>
<header>scan_control</header>
</customwidget>
<customwidget>
<class>Waveform</class>
<extends>QWidget</extends>
<header>waveform</header>
</customwidget>
<customwidget>
<class>BECQueue</class>
<extends>QWidget</extends>
<header>bec_queue</header>
</customwidget>
<customwidget>
<class>Minesweeper</class>
<extends>QWidget</extends>
<header>minesweeper</header>
</customwidget>
<customwidget>
<class>ScanHistory</class>
<extends>QWidget</extends>
<header>scan_history</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -1,71 +1,71 @@
from bec_widgets.cli.auto_updates import AutoUpdates, ScanInfo
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_lib.messages import ScanStatusMessage
from bec_widgets.cli.rpc.rpc_base import RPCResponseTimeoutError
class PlotUpdate(AutoUpdates):
create_default_dock = True
enabled = True
_scan_msg = None
def do_update(self, msg):
"""Save the original scan message for future use"""
self._scan_msg = msg
return super().do_update(msg)
#######################################################################
################# GUI Callbacks #######################################
#######################################################################
# def simple_line_scan(self, info: ScanInfo) -> None:
# """
# Simple line scan.
# """
# dev_x = info.scan_report_devices[0]
# dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
# if not dev_y:
# return
# self.figure.clear_all()
# plt = self.figure.plot(dev_x, dev_y)
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def keyword_handler(self, info: ScanInfo) -> None:
"""Simple keyword handler
This simple keyword handler looks for the keyword 'datasource' in the scan arguments.
This allows the user to explictly specify the desired data source. Useful for alignment
scans.
def on_start(self) -> None:
"""
dev_x = info.scan_report_devices[0]
if "kwargs" in self._scan_msg.info:
dev_y = self._scan_msg.info["kwargs"].get("datasource", None)
else:
dev_y = None
if not dev_y:
return
Procedure to run when the auto updates are enabled.
"""
self.start_default_dock()
plt1 = self.get_default_figure()
try:
# This will throw RPCResponseTimeoutError
plt1.clear_all()
except RPCResponseTimeoutError:
pass
try:
# TODO: What about 2D scans?
# This will throw RPCResponseTimeoutError
plt1.plot(x_name=dev_x, y_name=dev_y)
except RPCResponseTimeoutError:
pass
try:
# This will throw RPCResponseTimeoutError
plt1.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
except RPCResponseTimeoutError:
pass
# plt1.add_dap(dev_x, dev_y, dap="LinearModel")
def on_stop(self) -> None:
"""
Procedure to run when the auto updates are disabled.
"""
def handler(self, info: ScanInfo) -> None:
"""Dock configuration handler"""
# EXAMPLES:
# if info.scan_name == "line_scan" and info.scan_report_devices:
# self.simple_line_scan(info)
# return
# if info.scan_name == "grid_scan" and info.scan_report_devices:
# self.run_grid_scan_update(info)
# return
super().handler(info)
self.keyword_handler(info)
def on_scan_open(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan starts.
Args:
msg (ScanStatusMessage): The scan status message.
"""
if msg.scan_name == "line_scan" and msg.scan_report_devices:
return self.simple_line_scan(msg)
if msg.scan_name == "grid_scan" and msg.scan_report_devices:
return self.simple_grid_scan(msg)
dev_x = msg.scan_report_devices[0]
if "kwargs" in msg.request_inputs:
dev_y = msg.request_inputs["kwargs"].get("plot", None)
if dev_y is not None:
# Set the dock to the waveform widget
wf = self.set_dock_to_widget("Waveform")
# Clear the waveform widget and plot the data
wf.clear_all()
wf.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {msg.info.scan_number} - {dev_y}",
title=f"Scan {msg.info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
elif msg.scan_report_devices:
return self.best_effort(msg)
return None
def on_scan_closed(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan ends.
Args:
msg (ScanStatusMessage): The scan status message.
"""
def on_scan_abort(self, msg: ScanStatusMessage) -> None:
"""
Procedure to run when a scan is aborted.
Args:
msg (ScanStatusMessage): The scan status message.
"""

View File

@@ -0,0 +1,40 @@
# This file was automatically generated by generate_cli.py
# type: ignore
from __future__ import annotations
from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
logger = bec_logger.logger
# pylint: skip-file
_Widgets = {
"ScanHistory": "ScanHistory",
}
class ScanHistory(RPCBase):
@rpc_call
def select_scan_from_history(self, value: "int") -> "None":
"""
Set scan from CLI.
Args:
value (int) : value from history -1 ...-10000
"""
@rpc_call
def add_scan_from_history(self) -> "None":
"""
Load selected scan from history.
"""
@rpc_call
def clear_plot(self) -> "None":
"""
Delete all curves on the plot.
"""

View File

@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from pxiii_bec.bec_widgets.widgets.scan_history.scan_history_plugin import ScanHistoryPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ScanHistoryPlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,191 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING, TypedDict
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.ui_loader import UILoader
from qtpy.QtWidgets import QVBoxLayout, QWidget
logger = bec_logger.logger
if TYPE_CHECKING:
from qtpy.QtWidgets import QPushButton, QLabel, QSpinBox
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.widgets.control.device_input.device_combobox.device_combobox import DeviceComboBox
from bec_widgets.widgets.editors.text_box.text_box import TextBox
class ScanHistoryUIComponents(TypedDict):
waveform: Waveform
metadata_text_box: TextBox
monitor_label: QLabel
monitor_combobox: DeviceComboBox
history_label: QLabel
history_spin_box: QSpinBox
history_add: QPushButton
history_clear: QPushButton
class ScanHistory(BECWidget, QWidget):
USER_ACCESS = ["select_scan_from_history", "add_scan_from_history", "clear_plot"]
PLUGIN = True
ui_file = "./scan_history.ui"
components: ScanHistoryUIComponents
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self._load_ui()
def _load_ui(self):
current_path = os.path.dirname(__file__)
self.ui = UILoader(self).loader(os.path.join(current_path, self.ui_file))
layout = QVBoxLayout()
layout.addWidget(self.ui)
self.setLayout(layout)
self.components: ScanHistoryUIComponents = {
"waveform" : self.ui.waveform,
"metadata_text_box" : self.ui.metadata_text_box,
"monitor_label" : self.ui.monitor_label,
"monitor_combobox" : self.ui.monitor_combobox,
"history_label" : self.ui.history_label,
"history_spin_box" : self.ui.history_spin_box,
"history_add" : self.ui.history_add,
"history_clear" : self.ui.history_clear,
}
icon_options = {"size": (16, 16), "convert_to_pixmap": False}
self.components['monitor_combobox'].apply_filter = False
self.components['monitor_combobox'].devices = ['dccm_diode_bottom', 'dccm_diode_top']
self.components['history_spin_box'].setMinimum(-10000)
self.components['history_spin_box'].setMaximum(-1)
self.components['history_spin_box'].valueChanged.connect(self._scan_history_selected)
self._scan_history_selected(-1)
self.components['history_spin_box'].setValue(-1)
self.components['history_add'].setText("Load")
self.components['history_add'].setStyleSheet(
"background-color: #129490; color: white; font-weight: bold; font-size: 12px;"
)
self.components['history_clear'].setText("Clear")
self.components['history_clear'].setStyleSheet(
"background-color: #065143; color: white; font-weight: bold; font-size: 12px;"
)
self.components['history_add'].clicked.connect(self._refresh_plot)
self.components['history_clear'].clicked.connect(self.clear_plot)
self.setWindowTitle("Scan History")
self._scan_history_selected(-1)
@SafeSlot()
def add_scan_from_history(self) -> None:
"""Load selected scan from history."""
self.components['history_add'].click()
@SafeSlot()
def clear_plot(self) -> None:
"""Delete all curves on the plot."""
self.components['waveform'].clear_all()
@SafeSlot()
def _refresh_plot(self) -> None:
"""Refresh plot."""
spin_box_value = self.components['history_spin_box'].value()
self._check_scan_in_history(spin_box_value)
# Get the data from the client
data = self.client.history[spin_box_value]
# Check that the plot does not already have a curve with the same data
scan_number = int(data.metadata.bec['scan_number'])
monitor_name = self.components['monitor_combobox'].currentText()
# Get signal hints
signal_name = getattr(self.client.device_manager.devices, monitor_name)._hints
signal_name = signal_name[0] if len(signal_name)>0 else signal_name
curve_label = f"Scan-{scan_number}-{monitor_name}-{signal_name}"
if len([curve for curve in self.components['waveform'].curves if curve.config.label == curve_label]):
return
if not hasattr(data.devices, monitor_name):
raise ValueError(f"Device {monitor_name} not found in data.")
# Get scan motors and check that the plot x_axis motor is the same as the scan motor, if not, clear the plot
scan_motors = [motor.decode() for motor in data.metadata.bec['scan_motors']]
x_motor_name = self.components['waveform'].x_mode
if x_motor_name not in scan_motors:
self.clear_plot()
self.components['waveform'].x_mode = x_motor_name = scan_motors[0]
# fetching the data
monitor_data = getattr(data.devices, monitor_name).read()[signal_name]['value']
motor_data = getattr(data.devices, x_motor_name).read()[x_motor_name]['value']
# Plot custom curve, with custom label
self.components['waveform'].plot(x=motor_data, y=monitor_data, label=curve_label)
x_label = f"{x_motor_name} / [{getattr(self.client.device_manager.devices, x_motor_name).egu()}]"
self.components['waveform'].x_label = x_label
def _check_scan_in_history(self, history_value:int) -> None:
"""
Check if scan is in history.
Args:
history_value (int): Value from history -1...-10000
"""
if len(self.client.history) < abs(history_value):
self.components['metadata_text_box'].set_plain_text(f"Scan history does not have the request scan {history_value} of history with length: {len(self.client.history)}")
return
def select_scan_from_history(self, value:int) -> None:
"""
Set scan from CLI.
Args:
value (int) : value from history -1 ...-10000
"""
if value >=0:
raise ValueError(f"Value must be smaller or equal -1, provided {value}")
self.components['history_spin_box'].setValue(value)
@SafeSlot(int)
def _scan_history_selected(self, spin_box_value:int) -> None:
self._check_scan_in_history(spin_box_value)
data = self.client.history[spin_box_value]
data.metadata.bec['scan_motors'][0].decode()
text = str(data)
scan_motor_text = "\n" + "Scan Motors: "
for motor in data.metadata.bec['scan_motors']:
scan_motor_text += f" {motor.decode()}"
self.components['metadata_text_box'].set_plain_text(text + scan_motor_text)
@SafeSlot(str)
def _set_x_axis(self, device_x:str) -> None:
self.components['waveform'].x_mode = device_x
@SafeSlot(str)
def _plot_new_device(self, device:str) -> None:
# if len(curve for curve in self.components["waveform"].curves if curve.config.label == f"{device}-{device}":
self.components["waveform"].plot(device)
if __name__ == "__main__": # pragma: no cover
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = ScanHistory()
widget.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@
{'files': ['scan_history.py']}

View File

@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>955</width>
<height>796</height>
</rect>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout_2" stretch="9,3">
<item>
<widget class="Waveform" name="waveform">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>0</width>
<height>0</height>
</size>
</property>
</widget>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout_2">
<item>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QLabel" name="monitor_label">
<property name="font">
<font/>
</property>
<property name="text">
<string>BPM Monitor</string>
</property>
</widget>
</item>
<item>
<widget class="DeviceComboBox" name="monitor_combobox"/>
</item>
<item>
<widget class="QLabel" name="history_label">
<property name="text">
<string>Scan History</string>
</property>
</widget>
</item>
<item>
<widget class="QSpinBox" name="history_spin_box"/>
</item>
<item>
<widget class="QPushButton" name="history_add">
<property name="text">
<string>Add scan</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="history_clear">
<property name="text">
<string>clear all</string>
</property>
</widget>
</item>
</layout>
</item>
<item>
<widget class="TextBox" name="metadata_text_box">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="MinimumExpanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>795</width>
<height>191</height>
</size>
</property>
</widget>
</item>
</layout>
</item>
</layout>
</widget>
<customwidgets>
<customwidget>
<class>TextBox</class>
<extends>QWidget</extends>
<header>text_box</header>
</customwidget>
<customwidget>
<class>DeviceComboBox</class>
<extends>QComboBox</extends>
<header>device_combobox</header>
</customwidget>
<customwidget>
<class>Waveform</class>
<extends>QWidget</extends>
<header>waveform</header>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@@ -0,0 +1,54 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from pxiii_bec.bec_widgets.widgets.scan_history.scan_history import ScanHistory
DOM_XML = """
<ui language='c++'>
<widget class='ScanHistory' name='scan_history'>
</widget>
</ui>
"""
class ScanHistoryPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = ScanHistory(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(ScanHistory.ICON_NAME)
def includeFile(self):
return "scan_history"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "ScanHistory"
def toolTip(self):
return "ScanHistory"
def whatsThis(self):
return self.toolTip()

View File

@@ -1,40 +1,127 @@
sldi_cenx:
description: FE slit-diaphragm horizontal center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENX'}
sls_current:
description: sls current
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'ARS07-DPCT-0100:CURR', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
vg0_press:
description: VG0 pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-VMCC-0000:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
abs_press:
description: Absorber pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-FE-ABS1-VMCC-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: true
softwareTrigger: false
sldi_cenx:
description: FE slit-diaphragm horizontal center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENTERX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizex:
description: FE slit-diaphragm horizontal size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:GAPX'}
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEX'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_ceny:
description: FE slit-diaphragm vertical center
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENY'}
deviceConfig: {prefix: 'X06DA-FE-SLDI:CENTERY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
sldi_sizey:
description: FE slit-diaphragm vertical size
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-SLDI:GAPY'}
deviceConfig: {prefix: 'X06DA-FE-SLDI:SIZEY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_try:
description: FE collimating mirror try
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:TRY'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_pitch:
description: FE collimating mirror pitch
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:PITCH'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
fecmi_bend:
description: FE collimating mirror bend
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-FE-MI1:BEND1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
deviceTags:
- fe
readOnly: false
softwareTrigger: false
slh_press:
description: OP slit pressure
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-SLH-VMFR-1010:PRESSURE', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
slh_trxr:
description: OP slit inner blade motion
deviceClass: ophyd.EpicsMotor
@@ -65,14 +152,23 @@ fi1_try:
dccm_theta1:
description: Monochromator pitch 1
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH1'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
dccm_diode:
description: Diode between mono crystals
dccm_diode_top:
description: Top diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:TOP:READOUT', auto_monitor: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
dccm_diode_bottom:
description: Bottom diode between mono crystals
deviceClass: ophyd.EpicsSignalRO
deviceConfig: {read_pv: 'X06DA-OP-XPM1:BOT:READOUT', auto_monitor: true}
onFailure: buffer
@@ -83,7 +179,7 @@ dccm_diode:
dccm_theta2:
description: Monochromator pitch 2
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-OP-DCCM:PITCH2'}
deviceConfig: {prefix: 'X06DA-OP-DCCM:THETA2'}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -545,24 +641,7 @@ backlight:
readoutPriority: baseline
readOnly: false
softwareTrigger: false
det_y:
description: Pilatus height
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_z:
description: Pilatus translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
@@ -657,3 +736,22 @@ phi:
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_y:
description: Pilatus height
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRY1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false
det_z:
description: Pilatus translation
deviceClass: ophyd.EpicsMotor
deviceConfig: {prefix: 'X06DA-ES-DET:TRZ1'}
onFailure: buffer
enabled: true
readoutPriority: monitored
readOnly: false
softwareTrigger: false

View File

@@ -51,9 +51,9 @@ Examples
"""
import time
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
try:
from .A3200enums import AbrCmd, AbrMode
@@ -67,90 +67,7 @@ logger = bec_logger.logger
# pylint: disable=logging-fstring-interpolation
class AerotechAbrMixin(CustomPrepare):
"""Configuration class for the Aerotech A3200 controller for the ABR stage"""
def on_stage(self):
"""
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
"""
logger.warning(f"Configuring {self.parent.scaninfo.scan_msg.info['scan_name']} on ABR")
d = {}
if self.parent.scaninfo.scan_type in ("measure", "measurement", "fly"):
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
scanname = self.parent.scaninfo.scan_msg.info["scan_name"]
if scanname in (
"standardscan",
"helicalscan",
"helicalscan1",
"helicalscan2",
"helicalscan3",
):
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scanargs["start"]
d["var_2"] = scanargs["range"]
d["var_3"] = scanargs["move_time"]
d["var_4"] = scanargs.get("ready_rate", 500)
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("verticallinescan", "vlinescan"):
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
d["var_1"] = scanargs["range"] / scanargs["steps"]
d["var_2"] = scanargs["steps"]
d["var_3"] = scanargs["exp_time"]
d["var_4"] = 0
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("screeningscan"):
d["scan_command"] = AbrCmd.SCREENING
d["var_1"] = scanargs["start"]
d["var_2"] = scanargs["oscrange"]
d["var_3"] = scanargs["exp_time"]
d["var_4"] = scanargs["range"] / scanargs["steps"]
d["var_5"] = scanargs["steps"]
d["var_6"] = scanargs.get("delta", 0.5)
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("rasterscan", "rastersimplescan"):
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
d["var_1"] = scanargs["exp_time"]
d["var_2"] = scanargs["range_x"] / scanargs["steps_x"]
d["var_3"] = scanargs["range_y"] / scanargs["steps_y"]
d["var_4"] = scanargs["steps_x"]
d["var_5"] = scanargs["steps_y"]
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
# Reconfigure if got a valid scan config
if len(d) > 0:
self.parent.configure(d)
# Stage the parent
self.parent.bluestage()
def on_kickoff(self):
"""Kick off parent"""
self.parent.bluekickoff()
def on_unstage(self):
"""Unstage the ABR controller"""
self.parent.blueunstage()
class AerotechAbrStage(BECDeviceBase):
class AerotechAbrStage(PSIDeviceBase, Device):
"""Standard PX stage on A3200 controller
This is the wrapper class for the standard rotation stage layout for the PX
@@ -161,8 +78,7 @@ class AerotechAbrStage(BECDeviceBase):
it via 10+1 global variables.
"""
custom_prepare_cls = AerotechAbrMixin
USER_ACCESS = ["reset", "kickoff", "bluekickoff", "complete", "set_axis_mode", "arm", "disarm"]
USER_ACCESS = ["reset", "kickoff", "complete", "set_axis_mode", "arm", "disarm"]
taskStop = Component(EpicsSignal, "-AERO:TSK-STOP", put_complete=True, kind=Kind.omitted)
status = Component(EpicsSignal, "-AERO:STAT", put_complete=True, kind=Kind.omitted)
@@ -214,6 +130,30 @@ class AerotechAbrStage(BECDeviceBase):
task4 = Component(EpicsSignalRO, "-AERO:TSK4-DONE", auto_monitor=True)
scan_done = Component(EpicsSignal, "-GRD:SCAN-DONE", kind=Kind.config)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# super() will call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def set_axis_mode(self, mode: str, settle_time=0.1) -> None:
"""Set axis mode to direct/measurement mode.
@@ -230,6 +170,82 @@ class AerotechAbrStage(BECDeviceBase):
if mode == "measuring":
self.axisAxesMode.set(AbrMode.MEASURING, settle_time=settle_time).wait()
def on_stage(self):
"""
NOTE: Zac's request is that stage is essentially ARM, i.e. get ready and don't do anything.
"""
d = {}
# FIXME: I don't care about how we fish out config parameters from scan info
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
scanname = self.scan_info.msg.scan_name
if scanname in (
"standardscan",
"helicalscan",
"helicalscan1",
"helicalscan2",
"helicalscan3",
):
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["range"]
d["var_3"] = scan_args["move_time"]
d["var_4"] = scan_args.get("ready_rate", 500)
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("verticallinescan", "vlinescan"):
d["scan_command"] = AbrCmd.VERTICAL_LINE_SCAN
d["var_1"] = scan_args["range"] / scan_args["steps"]
d["var_2"] = scan_args["steps"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = 0
d["var_5"] = 0
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("screeningscan"):
d["scan_command"] = AbrCmd.SCREENING
d["var_1"] = scan_args["start"]
d["var_2"] = scan_args["oscrange"]
d["var_3"] = scan_args["exp_time"]
d["var_4"] = scan_args["range"] / scan_args["steps"]
d["var_5"] = scan_args["steps"]
d["var_6"] = scan_args.get("delta", 0.5)
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
if scanname in ("rasterscan", "rastersimplescan"):
d["scan_command"] = AbrCmd.RASTER_SCAN_SIMPLE
d["var_1"] = scan_args["exp_time"]
d["var_2"] = scan_args["range_x"] / scan_args["steps_x"]
d["var_3"] = scan_args["range_y"] / scan_args["steps_y"]
d["var_4"] = scan_args["steps_x"]
d["var_5"] = scan_args["steps_y"]
d["var_6"] = 0
d["var_7"] = 0
# d["var_8"] = 0
# d["var_9"] = 0
# Reconfigure if got a valid scan config
if len(d) > 0:
self.configure(d)
# Stage the ABR stage
self.arm()
def on_unstage(self):
"""Unstage the ABR controller"""
self.disarm()
def configure(self, d: dict) -> tuple:
""" " Configure the exposure scripts
@@ -284,14 +300,14 @@ class AerotechAbrStage(BECDeviceBase):
new = self.read_configuration()
return old, new
def bluestage(self):
def arm(self):
"""Bluesky-style stage
Since configuration synchronization is not guaranteed, this does
nothing. The script launched by kickoff().
"""
def bluekickoff(self, timeout=1) -> SubscriptionStatus:
def on_kickoff(self, timeout=1) -> SubscriptionStatus:
"""Kick off the set program"""
self.start_command.set(1).wait()
@@ -304,7 +320,7 @@ class AerotechAbrStage(BECDeviceBase):
status.wait()
# return status
def blueunstage(self, settle_time=0.1):
def disarm(self, settle_time=0.1):
"""Stops current script and releases the axes"""
# Disarm commands
self.scan_command.set(AbrCmd.NONE, settle_time=settle_time).wait()

View File

@@ -0,0 +1,49 @@
import numpy as np
from scipy.ndimage import gaussian_filter1d
from lmfit.models import GaussianModel
def alignment_fit_and_plot(
history_index: int,
device_name: str,
signal_name: str | None = None,
smoothing_sigma: float = 2.0,
):
"""
Get data for a completed scan from the BEC history, apply smoothing, gaussian fit,
gradient, and plot all the results.
Args:
history_index (int): scan to fetch, e.g. -1 for the most recent scan
device_name (str): the device for which to get the monitoring data
"""
# Fetch scan data from the history
# by default, signal = device name, unless otherwise specified
signal = signal_name or device_name
scan = bec.history[history_index]
md = scan.metadata["bec"]
data = scan.devices[device_name][signal].read()["value"]
# motor name is a bytes object in the metadata, so make a string
motor_name = md["scan_motors"][0].decode()
# Create a plot and a text box to display results
dock_area = bec.gui.new()
wf = dock_area.new().new(bec.gui.available_widgets.Waveform)
wf.title = f"Scan {md['scan_number']}: {md['scan_name']} of {motor_name}"
text = dock_area.new(position="right").new(widget=bec.gui.available_widgets.TextBox)
# Calculate some processed data and add everything to the plot
wf.plot(data, label="Raw data")
smoothed_data = gaussian_filter1d(data, smoothing_sigma)
wf.plot(smoothed_data, label="Smoothed")
gradient = np.gradient(smoothed_data)
wf.plot(gradient, label="gradient")
# Fit a Gaussian model to the smoothed data and show the fitting parameters in the textbox
x_data = scan.devices[motor_name][motor_name].read()["value"]
model = GaussianModel()
result = model.fit(smoothed_data, x=x_data)
text.set_plain_text(f"Fit parameters: \n{result.params.pretty_repr()}")
return result

View File

@@ -1,5 +1,8 @@
import bec
import bec_lib.devicemanager.DeviceContainer as dev
# pylint: disable=undefined-variable
# import bec
# import bec_lib.devicemanager.DeviceContainer as dev
import time
def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visual=True, **kwargs):
@@ -66,3 +69,9 @@ def rock(steps, exp_time, scan_start=None, scan_end=None, datasource=None, visua
# TODO: Move to fitted maximum
return s, firt_par
def monitor(device, steps, t=1):
for _ in range(steps):
print(device.read())
time.sleep(t)

30
pxiii_bec/scripts/kat.py Normal file
View File

@@ -0,0 +1,30 @@
def scan_theta2(scan_start, scan_end, stepno, exp):
# Save the motor starting position
start_value = dev.dccm_theta2.read()['dccm_theta2']['value']
print(f"Motor position is {start_value}")
# Run the scan
s = scans.line_scan(dev.dccm_theta2, scan_start, scan_end, steps=stepno, exp_time=exp, relative=True)
# data = s.devices[dccm_xbpm][dccm_xbpm].read()["value"]
# Move motor back to starting position and print XBPM reading
umv(dev.dccm_theta2, start_value)
xbpm_reading = dev.dccm_xbpm.read()['dccm_xbpm']['value']
print(f"Moving dccm_theta2 back to start position of where XBPM Reading is {xbpm_reading}")
end_value = dev.dccm_theta2.read()['dccm_theta2']['value']
print(f"Motor was at {start_value} before the scan, now at {end_value}")
# # Create a plot to display the results
dock_area = bec.gui.new()
wf = dock_area.new().new(bec.gui.available_widgets.Waveform)
wf.title = f"Scan of DCCM_theta2"
wf.plot(x_name='dccm_theta2', y_name='dccm_xbpm')
wf.add_dap_curve(device_label='dccm_xbpm-dccm_xbpm', dap_name='GaussianModel')
print(dap_xbpm.dap_params)

View File

@@ -1,5 +1,4 @@
import bec
import bec_lib.devicemanager.DeviceContainer as dev
# pylint: disable=undefined-variable
def bl_check_beam():
@@ -8,15 +7,7 @@ def bl_check_beam():
def ascan(
motor,
scan_start,
scan_end,
steps,
exp_time,
datasource=None,
visual=True,
relative=False,
**kwargs,
motor, scan_start, scan_end, steps, exp_time, plot=None, visual=True, relative=False, **kwargs
):
"""Demo step scan with plotting
@@ -44,10 +35,10 @@ def ascan(
# Draw a simploe plot in the window
dock = window.add_dock(f"ScanDisplay {motor}")
plt1 = dock.add_widget("BECWaveformWidget")
plt1.plot(x_name=motor, y_name=datasource)
plt1.plot(x_name=motor, y_name=plot)
plt1.set_x_label(motor)
plt1.set_y_label(datasource)
plt1.add_dap(motor, datasource, dap="LinearModel")
plt1.set_y_label(plot)
plt1.add_dap(motor, plot, dap="LinearModel")
window.show()
print("Handing over to 'scans.line_scan'")
@@ -57,7 +48,7 @@ def ascan(
scan_end,
steps=steps,
exp_time=exp_time,
datasource=datasource,
plot=plot,
relative=relative,
**kwargs,
)
@@ -67,9 +58,7 @@ def ascan(
firt_par = plt1.get_dap_params()
else:
# Fitting without GUI
firt_par = bec.dap.LinearModel.fit(
s, motor.name, motor.name, datasource.name, datasource.name
)
firt_par = bec.dap.LinearModel.fit(s, motor.name, motor.name, plot.name, plot.name)
# # Some basic fit
# dkey = datasource.full_name

View File

@@ -16,6 +16,7 @@ dependencies = [
"bec_ipython_client",
"bec_lib",
"bec_server",
"bec_widgets",
"ophyd_devices",
"std_daq_client",
"rich",