refactoring
CI for debye_bec / test (push) Successful in 1m7s

This commit is contained in:
x01da
2026-05-18 10:38:01 +02:00
parent bda7a688e1
commit 6b5ff49b04
6 changed files with 1160 additions and 1212 deletions
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,174 @@
"""
Panel for user inputs of the digital twin widget
"""
# pylint: disable=E0611
from qtpy.QtWidgets import QLayout, QVBoxLayout, QWidget
from debye_bec.bec_widgets.widgets.qt_widgets import (
Button,
ComboBox,
Group,
InputNumberField,
NumberIndicator,
)
class InputPanel(QWidget):
"""Right-side control panel: input field, indicator, send, recording."""
def __init__(self, parent=None):
super().__init__(parent)
self._layout = QVBoxLayout(self)
self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
# Adapt to reality
self.adapt_reality = Button(label_button="Adapt to reality", enabled=True)
# Energy
self.energy = InputNumberField(
"energy", "Energy", unit="eV", init=8979, decimals=0, single_step=100, ll=4000, hl=65000
)
# FE Slits Acceptance
self.sldi_hacc = InputNumberField(
"h_acc",
"Horizontal",
unit="mrad",
prefix="±",
init=0.25,
decimals=3,
single_step=0.01,
ll=-0.1,
hl=0.9,
)
self.sldi_vacc = InputNumberField(
"v_acc",
"Vertical",
unit="mrad",
prefix="±",
init=0.1,
decimals=3,
single_step=0.01,
ll=-0.1,
hl=0.5,
)
self.sldi_ass_group = Group("FE Slits Acceptance", [self.sldi_hacc, self.sldi_vacc])
# Collimating mirror
self.cm_stripe = ComboBox("cm_stripe", "Stripe", ["Si", "Rh", "Pt"])
self.cm_pitch = InputNumberField(
"cm_pitch",
"Pitch",
unit="mrad",
init=-2.391,
decimals=3,
single_step=0.01,
ll=-4.6,
hl=-1.2,
)
self.cm_pitch_critical = NumberIndicator("Critical Pitch", "mrad", decimals=3)
self.cm_refl = NumberIndicator("Reflectivity at x eV", "%", decimals=0)
self.cm_refl_harm = NumberIndicator("Reflectivity at x eV", "%", decimals=0)
self.cm_ass_group = Group(
"Collimating Mirror",
[
self.cm_stripe,
self.cm_pitch,
self.cm_pitch_critical,
self.cm_refl,
self.cm_refl_harm,
],
)
# Monochromator
self.mo1_mode = ComboBox("mo1_mode", "Mode", ["Monochromatic", "Pinkbeam"])
self.mo1_xtal = ComboBox("mo1_xtal", "Crystal", ["Si(111)", "Si(311)"])
self.mo1_bragg_angle = NumberIndicator("Bragg Angle", "deg", decimals=1)
self.mo1_eres = NumberIndicator("Energy Resolution", "eV", decimals=2)
self.mo1_ass_group = Group(
"Monochromator", [self.mo1_mode, self.mo1_xtal, self.mo1_bragg_angle, self.mo1_eres]
)
# Focusing Mirror
self.fm_stripe = ComboBox(
"fm_stripe", "Stripe", ["Rh (toroid)", "Rh (flat)", "Pt (toroid)", "Pt (flat)"]
)
self.fm_focus = ComboBox("fm_focus", "Focus Type", ["Manual", "Focused", "Defocused"])
self.fm_rotx = InputNumberField(
"fm_rotx",
"Incidence Angle",
unit="mrad",
init=-2.391,
decimals=3,
single_step=0.01,
ll=-10,
hl=2,
)
self.fm_focx = InputNumberField(
"fm_focx",
"Beam Size Horizontal",
unit="mm",
init=1,
decimals=1,
single_step=0.1,
ll=0,
hl=30,
)
self.fm_focy = InputNumberField(
"fm_focy",
"Beam Size Vertical",
unit="mm",
init=1,
decimals=1,
single_step=0.1,
ll=0,
hl=10,
)
self.fm_rotx_ideal = NumberIndicator("Incidence Angle for focused beam", "mrad", decimals=3)
self.fm_refl = NumberIndicator("Reflectivity at x eV", "%", decimals=0)
self.fm_refl_harm = NumberIndicator("Reflectivity at x eV", "%", decimals=0)
self.fm_ass_group = Group(
"Focusing Mirror",
[
self.fm_stripe,
self.fm_focus,
self.fm_rotx,
self.fm_focx,
self.fm_focy,
self.fm_rotx_ideal,
self.fm_refl,
self.fm_refl_harm,
],
)
# Sample
self.cm_fm_harm_suppr = NumberIndicator("Total Suppression Factor at x eV", "", decimals=0)
self.smpl = InputNumberField(
"smpl",
"Sample Position",
unit="mm",
init=23511,
decimals=0,
single_step=100,
ll=23000,
hl=30000,
)
# Assemble complete assitant group
self.input_group = Group(
"User Input",
[
self.adapt_reality,
self.energy,
self.sldi_ass_group,
self.cm_ass_group,
self.mo1_ass_group,
self.fm_ass_group,
self.cm_fm_harm_suppr,
self.smpl,
],
)
self._layout.addWidget(self.input_group)
self._layout.addStretch()
@@ -1,28 +1,37 @@
import time
import random
import threading
import time
from bec_lib import bec_logger
# import qtawesome as qta
from bec_qthemes import material_icon
from bec_widgets.utils.colors import get_accent_colors
from bec_lib import bec_logger
from qtpy.QtCore import Property, QObject, QPropertyAnimation, Qt, QThread, Signal
from qtpy.QtGui import QTransform
from qtpy.QtWidgets import (
QApplication,
QDoubleSpinBox,
QFrame,
QGroupBox,
QHBoxLayout,
QLabel,
QPushButton,
QVBoxLayout,
QWidget,
)
from debye_bec.devices.absorber import STATUS as ABS_STATUS
from qtpy.QtCore import Qt, QThread, Signal, QObject, Property, QPropertyAnimation
from qtpy.QtWidgets import (
QGroupBox, QHBoxLayout, QVBoxLayout, QLabel, QPushButton,
QDoubleSpinBox, QFrame, QWidget, QApplication
)
from qtpy.QtGui import QTransform
logger = bec_logger.logger
class Status:
IN_POSITION = "in_position" # green mdi.check-circle
IN_POSITION = "in_position" # green mdi.check-circle
NOT_IN_POSITION = "not_in_position" # orange mdi.close-circle
MOVING = "moving" # blue mdi.loading (spinning)
ERROR = "error" # red mdi.alert-circle
MOVING = "moving" # blue mdi.loading (spinning)
ERROR = "error" # red mdi.alert-circle
class StatusIcon(QWidget):
"""
@@ -33,10 +42,10 @@ class StatusIcon(QWidget):
ICON_SIZE = 20
_ICON_MAP = {
Status.IN_POSITION: ("check_circle", "#27ae60"),
Status.IN_POSITION: ("check_circle", "#27ae60"),
Status.NOT_IN_POSITION: ("cancel", "#e6d922"),
Status.ERROR: ("warning", "#e74c3c"),
Status.MOVING: ("cycle", "#2980b9"),
Status.ERROR: ("warning", "#e74c3c"),
Status.MOVING: ("cycle", "#2980b9"),
}
def __init__(self, parent=None):
@@ -76,7 +85,9 @@ class StatusIcon(QWidget):
self._status = status
icon_name, color = self._ICON_MAP[status]
icon = material_icon(icon_name, size=(self.ICON_SIZE, self.ICON_SIZE), color=color, convert_to_pixmap=True)
icon = material_icon(
icon_name, size=(self.ICON_SIZE, self.ICON_SIZE), color=color, convert_to_pixmap=True
)
self._current_pixmap_base = icon
if status == Status.MOVING:
@@ -85,14 +96,16 @@ class StatusIcon(QWidget):
self._spin_anim.stop()
self._label.setPixmap(icon)
class MotionWorker(QObject):
"""
Executes motion on the specified motor and includes some safety during
motion for certain motors.
"""
position_changed = Signal(float)
error = Signal(bool) # True = error
finished = Signal(bool) # True = reached target, False = stopped
error = Signal(bool) # True = error
finished = Signal(bool) # True = reached target, False = stopped
def __init__(self, dev, motor, target_pos: float):
super().__init__()
@@ -104,101 +117,104 @@ class MotionWorker(QObject):
def stop(self):
self._stop_flag.set()
# def run(self):
# logger.info(f'Would run motor {self.motor}')
# simulated_run_time = 3
# start = time.time()
# while (time.time() - start) < simulated_run_time:
# if self._stop_flag.is_set():
# break
# time.sleep(0.01)
# # self.motor.move(self._target, relative=False)
# # while self.motor.motor_is_moving.get():
# # if self._stop_flag.is_set():
# # self.motor.motor_stop()
# # self.position_changed.emit(self.motor.read[self.name]['value'])
# # time.sleep(0.1)
# self.finished.emit(True)
def run(self):
match self.motor:
case 'sldi_gapx' | 'sldi_gapy' | 'sldi_centerx' | 'sldi_centery':
case "sldi_gapx" | "sldi_gapy" | "sldi_centerx" | "sldi_centery":
self.motion()
case 'cm_trx':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['cm_roty'], 'abs_tol': 0.05}
])
case 'cm_roty':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['cm_trx'], 'abs_tol': 0.05}
])
case 'cm_try':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['cm_rotx'], 'abs_tol': 0.05},
{'device': self.dev['cm_rotz'], 'abs_tol': 0.05},
])
case 'cm_rotx':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['cm_try'], 'abs_tol': 0.05},
{'device': self.dev['cm_rotz'], 'abs_tol': 0.05},
])
case 'cm_rotz':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['cm_try'], 'abs_tol': 0.05},
{'device': self.dev['cm_rotx'], 'abs_tol': 0.05},
])
case 'cm_bnd':
p1 = (1/(self.dev.cm_bnd_radius.read()['cm_bnd_radius']['value']*1e3) + 0.0284)/2e-6
p2 = (1/(self._target*1e3) + 0.0284)/2e-6
self._target = p2 - p1
self.motion(relative=True, rb=
{'device': self.dev['cm_bnd_radius']}
case "cm_trx":
self.motion(
abs_closed=True,
surveyed_axes=[{"device": self.dev["cm_roty"], "abs_tol": 0.05}],
)
case 'mo1_try' | 'mo1_trx' | 'mo1_roty':
case "cm_roty":
self.motion(
abs_closed=True, surveyed_axes=[{"device": self.dev["cm_trx"], "abs_tol": 0.05}]
)
case "cm_try":
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["cm_rotx"], "abs_tol": 0.05},
{"device": self.dev["cm_rotz"], "abs_tol": 0.05},
],
)
case "cm_rotx":
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["cm_try"], "abs_tol": 0.05},
{"device": self.dev["cm_rotz"], "abs_tol": 0.05},
],
)
case "cm_rotz":
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["cm_try"], "abs_tol": 0.05},
{"device": self.dev["cm_rotx"], "abs_tol": 0.05},
],
)
case "cm_bnd":
p1 = (
1 / (self.dev.cm_bnd_radius.read()["cm_bnd_radius"]["value"] * 1e3) + 0.0284
) / 2e-6
p2 = (1 / (self._target * 1e3) + 0.0284) / 2e-6
self._target = p2 - p1
self.motion(relative=True, rb={"device": self.dev["cm_bnd_radius"]})
case "mo1_try" | "mo1_trx" | "mo1_roty":
self.motion(abs_closed=True)
case 'mo1_bragg_angle':
case "mo1_bragg_angle":
self.motion()
case 'sl1_centery' | 'sl1_gapy' | 'bm1_try':
case "sl1_centery" | "sl1_gapy" | "bm1_try":
self.motion()
case 'fm_trx':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['fm_roty'], 'abs_tol': 0.05}
])
case 'fm_roty':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['fm_trx'], 'abs_tol': 0.05}
])
case 'fm_try':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['fm_rotx'], 'abs_tol': 0.05},
{'device': self.dev['fm_rotz'], 'abs_tol': 0.05},
])
case 'fm_rotx':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['fm_try'], 'abs_tol': 0.05},
{'device': self.dev['fm_rotz'], 'abs_tol': 0.05},
])
case 'fm_rotz':
self.motion(abs_closed=True, surveyed_axes=[
{'device': self.dev['fm_try'], 'abs_tol': 0.05},
{'device': self.dev['fm_rotx'], 'abs_tol': 0.05},
])
case 'fm_bnd':
p1 = (1/(self.dev.fm_bnd_radius.read()['fm_bnd_radius']['value']*1e3) + 4.28e-5)/1.84e-9
p2 = (1/(self._target*1e3) + 4.28e-5)/1.84e-9
self._target = p2 - p1
self.motion(relative=True, rb=
{'device': self.dev['fm_bnd_radius']}
case "fm_trx":
self.motion(
abs_closed=True,
surveyed_axes=[{"device": self.dev["fm_roty"], "abs_tol": 0.05}],
)
case 'sl2_centery' | 'sl2_gapy' | 'bm2_try':
case "fm_roty":
self.motion(
abs_closed=True, surveyed_axes=[{"device": self.dev["fm_trx"], "abs_tol": 0.05}]
)
case "fm_try":
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["fm_rotx"], "abs_tol": 0.05},
{"device": self.dev["fm_rotz"], "abs_tol": 0.05},
],
)
case "fm_rotx":
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["fm_try"], "abs_tol": 0.05},
{"device": self.dev["fm_rotz"], "abs_tol": 0.05},
],
)
case "fm_rotz":
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["fm_try"], "abs_tol": 0.05},
{"device": self.dev["fm_rotx"], "abs_tol": 0.05},
],
)
case "fm_bnd":
p1 = (
1 / (self.dev.fm_bnd_radius.read()["fm_bnd_radius"]["value"] * 1e3) + 4.28e-5
) / 1.84e-9
p2 = (1 / (self._target * 1e3) + 4.28e-5) / 1.84e-9
self._target = p2 - p1
self.motion(relative=True, rb={"device": self.dev["fm_bnd_radius"]})
case "sl2_centery" | "sl2_gapy" | "bm2_try":
self.motion()
case 'ot_try' | 'ot_rotx' | 'ot_es1_trz':
case "ot_try" | "ot_rotx" | "ot_es1_trz":
self.motion()
case _:
logger.warning(f'Motor {self.motor} not integrated in digital twin!')
logger.warning(f"Motor {self.motor} not integrated in digital twin!")
def motion(self, abs_closed=False, relative=False, rb=None, surveyed_axes = None):
def motion(self, abs_closed=False, relative=False, rb=None, surveyed_axes=None):
"""
Moves an axis while surverying a set of axes (if set).
Example surveyed_axes:
@@ -215,30 +231,37 @@ class MotionWorker(QObject):
status.wait(timeout=5)
if surveyed_axes is not None:
for surv_ax in surveyed_axes:
surv_ax['name'] = surv_ax['device'].dotted_name
surv_ax['old_value'] = surv_ax['device'].read(cached=True)[surv_ax['name']]['value']
surv_ax["name"] = surv_ax["device"].dotted_name
surv_ax["old_value"] = surv_ax["device"].read(cached=True)[surv_ax["name"]]["value"]
if rb is not None:
rb['name'] = rb['device'].dotted_name
self.dev[self.motor].move(self._target, relative=relative)
time.sleep(0.5)
while self.dev[self.motor].motor_is_moving.get():
rb["name"] = rb["device"].dotted_name
status = self.dev[self.motor].move(self._target, relative=relative)
last_check = time.time()
update_interval = 0.1
while status.status == "RUNNING":
now = time.time()
if time.time() - last_check < update_interval:
time.sleep(0.01)
last_check = now
if self._stop_flag.is_set():
self.dev[self.motor].stop()
self._stop_flag.clear()
if rb is not None:
self.position_changed.emit(rb['device'].read(cached=True)[rb['name']]['value'])
self.position_changed.emit(rb["device"].read(cached=True)[rb["name"]]["value"])
else:
self.position_changed.emit(self.dev[self.motor].read(cached=True)[self.motor]['value'])
self.position_changed.emit(
self.dev[self.motor].read(cached=True)[self.motor]["value"]
)
if surveyed_axes is not None:
for surv_ax in surveyed_axes:
fb = surv_ax['device'].read(cached=True)[surv_ax['name']]['value']
if abs(fb - surv_ax['old_value']) > surv_ax['abs_tol']:
fb = surv_ax["device"].read(cached=True)[surv_ax["name"]]["value"]
if abs(fb - surv_ax["old_value"]) > surv_ax["abs_tol"]:
self.dev[self.motor].stop()
self.error.emit(1)
break
time.sleep(0.1)
self.finished.emit(True)
class MoveWidget(QWidget):
"""
One motor stage control group containing:
@@ -248,7 +271,7 @@ class MoveWidget(QWidget):
- Start / Stop button
"""
def __init__(self, dev, motor, label: str = '', unit=None, decimals=3, deadband=0.0):
def __init__(self, dev, motor, label: str = "", unit=None, decimals=3, deadband=0.0):
super().__init__()
self.fb = 0.0
self.target = 0
@@ -276,12 +299,12 @@ class MoveWidget(QWidget):
layout.addWidget(self.label)
# Target
self.target_label = QLabel('-')
self.target_label = QLabel("-")
self.target_label.setFixedWidth(100)
layout.addWidget(self.target_label)
# Feedback
self.fb_label = QLabel('-')
self.fb_label = QLabel("-")
self.fb_label.setFixedWidth(100)
layout.addWidget(self.fb_label)
@@ -297,7 +320,7 @@ class MoveWidget(QWidget):
self.btn_action.setFixedHeight(20)
self.btn_action.clicked.connect(self._on_button_clicked)
layout.addWidget(self.btn_action)
self.btn_mode = 'start'
self.btn_mode = "start"
self._apply_button_style("start")
@@ -306,18 +329,18 @@ class MoveWidget(QWidget):
def apply_theme(self, theme=None):
if theme is None:
app = QApplication.instance()
theme = app.theme.theme # type: ignore
theme = app.theme.theme # type: ignore
if theme == "light":
self.text_color = {'target': (79, 163, 224), 'fb': (240, 128, 60)}
else: # dark theme
self.text_color = {'target': (26, 111, 173), 'fb': (212, 83, 10)}
r, g, b = self.text_color['target']
self.target_label.setStyleSheet(f'QLabel {{color: rgb({r}, {g}, {b})}}')
r, g, b = self.text_color['fb']
self.fb_label.setStyleSheet(f'QLabel {{color: rgb({r}, {g}, {b})}}')
self.text_color = {"target": (79, 163, 224), "fb": (240, 128, 60)}
else: # dark theme
self.text_color = {"target": (26, 111, 173), "fb": (212, 83, 10)}
r, g, b = self.text_color["target"]
self.target_label.setStyleSheet(f"QLabel {{color: rgb({r}, {g}, {b})}}")
r, g, b = self.text_color["fb"]
self.fb_label.setStyleSheet(f"QLabel {{color: rgb({r}, {g}, {b})}}")
if self.btn_mode == 'start':
if self.btn_mode == "start":
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().success.name()}; color: white;}}"
)
@@ -328,18 +351,18 @@ class MoveWidget(QWidget):
def set_target(self, target):
self.target = target
text = f'{target:.{int(self.decimals)}f}'
text = f"{target:.{int(self.decimals)}f}"
if self.unit is not None:
text = text + ' ' + self.unit
text = text + " " + self.unit
self.target_label.setText(text)
self._on_target_or_fb_changed()
def set_feedback(self, fb):
if self.status != Status.MOVING:
self.fb = fb
text = f'{fb:.{int(self.decimals)}f}'
text = f"{fb:.{int(self.decimals)}f}"
if self.unit is not None:
text = text + ' ' + self.unit
text = text + " " + self.unit
self.fb_label.setText(text)
self._on_target_or_fb_changed()
@@ -348,13 +371,13 @@ class MoveWidget(QWidget):
if mode == "start":
self.btn_action.setText("Move")
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().success.name()}; color: white;}}"
)
f"QPushButton {{background-color: {get_accent_colors().success.name()}; color: white;}}"
)
else: # stop
self.btn_action.setText("Stop")
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().emergency.name()}; color: white;}}"
)
f"QPushButton {{background-color: {get_accent_colors().emergency.name()}; color: white;}}"
)
def _set_status(self, status: str):
self.status = status
@@ -408,9 +431,9 @@ class MoveWidget(QWidget):
def _on_position_changed(self, pos: float):
self.fb = pos
text = f'{pos:.{int(self.decimals)}f}'
text = f"{pos:.{int(self.decimals)}f}"
if self.unit is not None:
text = text + ' ' + self.unit
text = text + " " + self.unit
self.fb_label.setText(text)
def _on_motion_finished(self, reached: bool):
@@ -437,12 +460,13 @@ class MoveWidget(QWidget):
self._thread.quit()
self._thread.wait(2000) # max 2 s grace period
class AbsorberWidget(QWidget):
"""
Control of the frontend absorber (only open)
"""
def __init__(self, absorber, label: str = 'Absorber'):
def __init__(self, absorber, label: str = "Absorber"):
super().__init__()
self.absorber = absorber
self.fb = False
@@ -460,17 +484,17 @@ class AbsorberWidget(QWidget):
layout.addWidget(self.label)
# Blank
self.blank_label = QLabel('')
self.blank_label = QLabel("")
self.blank_label.setFixedWidth(100)
layout.addWidget(self.blank_label)
# Feedback
self.fb_label = QLabel('-')
self.fb_label = QLabel("-")
self.fb_label.setFixedWidth(100)
layout.addWidget(self.fb_label)
# Blank icon
self.blank_icon = QLabel('')
self.blank_icon = QLabel("")
self.blank_icon.setFixedWidth(30)
self.blank_icon.setContentsMargins(0, 0, 10, 0)
layout.addWidget(self.blank_icon)
@@ -485,15 +509,11 @@ class AbsorberWidget(QWidget):
def set_feedback(self, fb: bool):
self.fb = fb
if fb:
self.fb_label.setText('Open')
self.fb_label.setStyleSheet(
f"QLabel {{color: {get_accent_colors().success.name()}}}"
)
self.fb_label.setText("Open")
self.fb_label.setStyleSheet(f"QLabel {{color: {get_accent_colors().success.name()}}}")
else:
self.fb_label.setText('Closed')
self.fb_label.setStyleSheet(
f"QLabel {{color: {get_accent_colors().emergency.name()}}}"
)
self.fb_label.setText("Closed")
self.fb_label.setStyleSheet(f"QLabel {{color: {get_accent_colors().emergency.name()}}}")
def enable_open(self, enable: bool = False):
if enable:
@@ -0,0 +1,220 @@
"""
Panel to move an axis to a certain position
"""
# pylint: disable=E0611
from qtpy.QtWidgets import QLayout, QVBoxLayout, QWidget
from debye_bec.bec_widgets.widgets.digital_twin.move_widget import AbsorberWidget, MoveWidget
from debye_bec.bec_widgets.widgets.qt_widgets import Group
class MoverPanel(QWidget):
def __init__(self, dev, parent=None):
super().__init__(parent)
self._layout = QVBoxLayout(self)
self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
self.mover_widgets = []
# FE Slits
self.sldi_gapx = MoveWidget(
dev=dev, motor="sldi_gapx", label="GAPX", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.sldi_gapx)
self.sldi_gapy = MoveWidget(
dev=dev, motor="sldi_gapy", label="GAPY", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.sldi_gapy)
self.sldi_mov_group = Group("FE Slits", [self.sldi_gapx, self.sldi_gapy])
# Absorber
self.abs = AbsorberWidget(absorber=dev.abs, label="")
self.abs_group = Group("Absorber", [self.abs])
# Collimating mirror
self.cm_trx = MoveWidget(
dev=dev, motor="cm_trx", label="TRX", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.cm_trx)
self.cm_try = MoveWidget(
dev=dev, motor="cm_try", label="TRY", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.cm_try)
self.cm_bnd = MoveWidget(
dev=dev, motor="cm_bnd", label="BENDER", unit="km", decimals=2, deadband=0.2
)
self.mover_widgets.append(self.cm_bnd)
self.cm_rotx = MoveWidget(
dev=dev, motor="cm_rotx", label="PITCH", unit="mrad", decimals=3, deadband=0.01
)
self.mover_widgets.append(self.cm_rotx)
self.cm_mov_group = Group(
"Collimating Mirror", [self.cm_trx, self.cm_try, self.cm_bnd, self.cm_rotx]
)
# Monochromator
self.mo1_bragg_angle = MoveWidget(
dev=dev,
motor="mo1_bragg_angle",
label="Bragg Angle",
unit="deg",
decimals=3,
deadband=0.01,
)
self.mover_widgets.append(self.mo1_bragg_angle)
self.mo1_trx = MoveWidget(
dev=dev, motor="mo1_trx", label="TRX", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.mo1_trx)
self.mo1_try = MoveWidget(
dev=dev, motor="mo1_try", label="TRY", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.mo1_try)
self.mo1_mov_group = Group(
"Monochromator", [self.mo1_bragg_angle, self.mo1_trx, self.mo1_try]
)
# OP Slits 1
self.sl1_centery = MoveWidget(
dev=dev, motor="sl1_centery", label="CENTERY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.sl1_centery)
self.sl1_gapy = MoveWidget(
dev=dev, motor="sl1_gapy", label="GAPY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.sl1_gapy)
self.sl1_mov_group = Group("OP Slits 1", [self.sl1_centery, self.sl1_gapy])
# OP Beam Monitor 1
self.bm1_try = MoveWidget(
dev=dev, motor="bm1_try", label="TRY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.bm1_try)
self.bm1_mov_group = Group("OP Beam Monitor 1", [self.bm1_try])
# Focusing Mirror
self.fm_trx = MoveWidget(
dev=dev, motor="fm_trx", label="TRX", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.fm_trx)
self.fm_try = MoveWidget(
dev=dev, motor="fm_try", label="TRY", unit="mm", decimals=2, deadband=0.01
)
self.mover_widgets.append(self.fm_try)
self.fm_bnd = MoveWidget(
dev=dev, motor="fm_bnd", label="BENDER", unit="km", decimals=2, deadband=0.2
)
self.mover_widgets.append(self.fm_bnd)
self.fm_rotx = MoveWidget(
dev=dev, motor="fm_rotx", label="PITCH", unit="mrad", decimals=3, deadband=0.01
)
self.mover_widgets.append(self.fm_rotx)
self.fm_roty = MoveWidget(
dev=dev, motor="fm_roty", label="YAW", unit="mrad", decimals=3, deadband=0.01
)
self.mover_widgets.append(self.fm_roty)
self.fm_rotz = MoveWidget(
dev=dev, motor="fm_rotz", label="ROLL", unit="mrad", decimals=3, deadband=0.01
)
self.mover_widgets.append(self.fm_rotz)
self.fm_mov_group = Group(
"Focusing Mirror",
[self.fm_trx, self.fm_try, self.fm_bnd, self.fm_rotx, self.fm_roty, self.fm_rotz],
)
# OP Slits 2
self.sl2_centery = MoveWidget(
dev=dev, motor="sl2_centery", label="CENTERY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.sl2_centery)
self.sl2_gapy = MoveWidget(
dev=dev, motor="sl2_gapy", label="GAPY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.sl2_gapy)
self.sl2_mov_group = Group("OP Slits 2", [self.sl2_centery, self.sl2_gapy])
# OP Beam Monitor 2
self.bm2_try = MoveWidget(
dev=dev, motor="bm2_try", label="TRY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.bm2_try)
self.bm2_mov_group = Group("OP Beam Monitor 2", [self.bm2_try])
# Optical Table
self.ot_try = MoveWidget(
dev=dev, motor="ot_try", label="TRY", unit="mm", decimals=2, deadband=0.2
)
self.mover_widgets.append(self.ot_try)
self.ot_rotx = MoveWidget(
dev=dev, motor="ot_rotx", label="ROTX", unit="mrad", decimals=3, deadband=0.05
)
self.mover_widgets.append(self.ot_rotx)
self.ot_mov_group = Group("Optical Table", [self.ot_try, self.ot_rotx])
# Experimental Station 0
self.es0wi_try = MoveWidget(
dev=dev, motor="es0wi_try", label="ES0 WI", unit="mm", decimals=0, deadband=0.1
)
self.mover_widgets.append(self.es0wi_try)
self.es0_mov_group = Group("Expperimental Station 0", [self.es0wi_try])
# Experimental Station 1
self.ot_es1_trz = MoveWidget(
dev=dev, motor="ot_es1_trz", label="ES1 TRZ", unit="mm", decimals=0, deadband=5
)
self.mover_widgets.append(self.ot_es1_trz)
self.es1_mov_group = Group("Expperimental Station 1", [self.ot_es1_trz])
# Assemble complete mover group
self.mover_group = Group(
"Mover",
[
self.sldi_mov_group,
self.abs_group,
self.cm_mov_group,
self.mo1_mov_group,
self.sl1_mov_group,
self.bm1_mov_group,
self.fm_mov_group,
self.sl2_mov_group,
self.bm2_mov_group,
self.ot_mov_group,
self.es0_mov_group,
self.es1_mov_group,
],
)
self._layout.addWidget(self.mover_group)
self._layout.addStretch()
def apply_theme(self, theme):
for widget in self.mover_widgets:
widget.apply_theme(theme)
@@ -0,0 +1,303 @@
"""
Two plot classes to plot side-view and surface-view
"""
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger
# pylint: disable=E0611
from qtpy.QtCore import Qt
from qtpy.QtGui import QBrush, QColor
# pylint: disable=E0611
from qtpy.QtWidgets import QApplication, QHBoxLayout, QVBoxLayout, QWidget
from debye_bec.bec_widgets.widgets.digital_twin.calc_varia import (
mirror_surface_geometries,
mo_surface_geometries,
pipe_geometries,
wall_geometries,
)
from debye_bec.bec_widgets.widgets.qt_widgets import Group
logger = bec_logger.logger
class SurfacePlots(QWidget):
"""Plot widget with two curves and legend."""
def __init__(self, parent=None):
super().__init__(parent=parent)
self._layout = QHBoxLayout(self)
self.surfaces = {
"assistant": {
"cm": {"x": [], "y": []},
"mo1_1": {"x": [], "y": []},
"mo1_2": {"x": [], "y": []},
"fm": {"x": [], "y": []},
},
"reality": {
"cm": {"x": [], "y": []},
"mo1_1": {"x": [], "y": []},
"mo1_2": {"x": [], "y": []},
"fm": {"x": [], "y": []},
},
}
self.plots = {"fm": {}, "mo1_2": {}, "mo1_1": {}, "cm": {}}
self.color_impenetrable = (0, 0, 0)
self.colors = [(255, 255, 0), (255, 0, 255)]
self.text_color = (255, 255, 255)
# Create plot widgets
for name, widget in self.plots.items():
plot_widget = pg.PlotWidget()
plot_widget.getAxis("bottom").enableAutoSIPrefix(False)
plot_group = Group("Surface " + name, [plot_widget])
plot_widget.setLabel("left", "Z [mm]")
plot_widget.setLabel("bottom", "X [mm]")
plot_widget.setMouseEnabled(x=False, y=False)
plot_widget.setMenuEnabled(False)
plot_widget.hideButtons()
widget["widget"] = plot_widget
self._layout.addWidget(plot_group)
# Create surfaces
for idx, scene in enumerate(self.surfaces):
for name, _ in self.surfaces[scene].items():
if scene in "assistant":
brush = QBrush(QColor(*self.colors[idx], 255), Qt.DiagCrossPattern)
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=1, style=Qt.DashLine)
z_value = 2
else:
brush = QBrush(QColor(*self.colors[idx], 255))
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=1)
z_value = 1
widget = self.plots[name]
self.plots[name][scene] = widget["widget"].plot(
[], [], pen=pen, name=scene, brush=brush, fillLevel=0
)
self.plots[name][scene].setZValue(z_value)
self.walls = []
self.texts = []
self.plot_walls()
self.apply_theme()
def apply_theme(self, theme=None):
if theme is None:
app = QApplication.instance()
theme = app.theme.theme # type: ignore
bg_color = pg.getConfigOption("background")
fg_color = pg.getConfigOption("foreground")
for _, plot in self.plots.items():
# Background
plot["widget"].setBackground(bg_color)
# Axes (tick marks, tick labels, axis line)
for axis in ["left", "bottom", "right", "top"]:
ax = plot["widget"].getAxis(axis)
ax.setPen(pg.mkPen(color=fg_color))
ax.setTextPen(pg.mkPen(color=fg_color))
if theme == "light":
self.color_impenetrable = (30, 30, 30)
self.colors = [(79, 163, 224), (240, 128, 60)]
self.text_color = (255, 255, 255)
else: # dark theme
self.color_impenetrable = (180, 180, 180)
self.colors = [(26, 111, 173), (212, 83, 10)]
self.text_color = (0, 0, 0)
for idx, scene in enumerate(self.surfaces):
for name, _ in self.surfaces[scene].items():
if scene in "assistant":
brush = QBrush(QColor(*self.colors[idx], 255), Qt.DiagCrossPattern)
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=1, style=Qt.DashLine)
else:
brush = QBrush(QColor(*self.colors[idx], 255))
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=0)
self.plots[name][scene].setPen(pen)
self.plots[name][scene].setBrush(brush)
for wall in self.walls:
wall.setPen(pg.mkPen(color=self.color_impenetrable, width=2))
wall.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
for text in self.texts:
text.setColor(self.text_color)
def plot_walls(self):
def plot_surface(widget, surfaces):
for name, surface in surfaces.items():
rect = pg.QtWidgets.QGraphicsRectItem(*surface) # pylint: disable=E1101
rect.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
rect.setPen(pg.mkPen(color=self.color_impenetrable, width=2))
widget.addItem(rect)
text = pg.TextItem(name, color=self.text_color, anchor=(0.5, 0.5))
widget.addItem(text)
text.setPos(surface[0] + surface[2] / 2, surface[1] + surface[3] / 2)
text.setZValue(10)
self.walls.append(rect)
self.texts.append(text)
for name, plot in self.plots.items():
if name in "cm":
plot_surface(plot["widget"], mirror_surface_geometries("cm"))
elif name in "mo1_1":
plot_surface(plot["widget"], mo_surface_geometries("mo1", 0))
elif name in "mo1_2":
plot_surface(plot["widget"], mo_surface_geometries("mo1", 1))
elif name in "fm":
plot_surface(plot["widget"], mirror_surface_geometries("fm_flat"))
plot_surface(plot["widget"], mirror_surface_geometries("fm_toroid"))
else:
raise Exception(f"Plot {name} not found!")
for name, plot in self.plots.items():
plot["widget"].disableAutoRange()
def update_surfaces(self, scene, data):
self.surfaces[scene] = data
for name, device in self.surfaces[scene].items():
plot = self.plots[name][scene]
x = np.array(device["x"] + [device["x"][0]]) if len(device["x"]) != 0 else np.array([])
y = np.array(device["y"] + [device["y"][0]]) if len(device["y"]) != 0 else np.array([])
plot.setData(x=x, y=y)
class SideviewPlot(QWidget):
"""Plot widget with two curves and legend."""
def __init__(self, parent=None):
super().__init__(parent=parent)
self._layout = QVBoxLayout(self)
# self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
self.plot_widget = pg.PlotWidget()
self.plot_widget.getAxis("bottom").enableAutoSIPrefix(False)
self.plot_widget.invertX(True)
self.plot_widget.addLegend()
self.color_impenetrable = (0, 0, 0)
self.colors = [(255, 255, 0), (255, 0, 255)]
self.data = {
"assistant": {"x": [0, 1000, 2000], "y": [0, 20, 30]},
"reality": {"x": [0, 1000, 2000], "y": [0, 15, 50]},
}
self.plots = {}
self.pipes = []
self.walls = []
for idx, scene in enumerate(self.data.keys()):
if scene in "assistant":
pen = pg.mkPen(color=self.colors[idx], width=2, style=Qt.DotLine)
z_value = 2
else:
pen = pg.mkPen(color=self.colors[idx], width=2)
z_value = 1
self.plots[scene] = self.plot_widget.plot([], [], pen=pen, name=scene)
self.plots[scene].setZValue(z_value)
self.plot_group = Group("Side View", [self.plot_widget])
self.plot_widget.setLabel("left", "Height [mm]")
self.plot_widget.setLabel("bottom", "Distance [mm]")
self.plot_widget.setMouseEnabled(x=False, y=False)
self.plot_widget.setXRange(0, 25000, padding=0.1)
self.plot_widget.setYRange(-20, 120, padding=0.1)
self.plot_widget.setMenuEnabled(False)
self.plot_widget.hideButtons()
self._layout.addWidget(self.plot_group)
self._layout.addStretch()
self.plot_vacuum_pipes()
self.plot_walls()
self.apply_theme()
def apply_theme(self, theme=None):
if theme is None:
app = QApplication.instance()
theme = app.theme.theme # type: ignore
bg_color = pg.getConfigOption("background")
fg_color = pg.getConfigOption("foreground")
# Background
self.plot_widget.setBackground(bg_color)
# Axes (tick marks, tick labels, axis line)
for axis in ["left", "bottom", "right", "top"]:
ax = self.plot_widget.getAxis(axis)
ax.setPen(pg.mkPen(color=fg_color))
ax.setTextPen(pg.mkPen(color=fg_color))
if theme == "light":
self.color_impenetrable = (30, 30, 30)
self.colors = [(79, 163, 224), (240, 128, 60)]
self.text_color = (255, 255, 255)
else: # dark theme
self.color_impenetrable = (180, 180, 180)
self.colors = [(26, 111, 173), (212, 83, 10)]
self.text_color = (0, 0, 0)
for idx, scene in enumerate(self.data):
if scene in "assistant":
brush = QBrush(QColor(*self.colors[idx], 255), Qt.DiagCrossPattern)
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=3, style=Qt.DotLine)
else:
brush = QBrush(QColor(*self.colors[idx], 255))
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=3)
self.plots[scene].setPen(pen)
self.plots[scene].setBrush(brush)
for wall in self.walls:
wall.setPen(pg.mkPen(color=self.color_impenetrable, width=3))
wall.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
for pipe in self.pipes:
pipe.setPen(pg.mkPen(color=self.color_impenetrable, width=3))
def plot_vacuum_pipes(self):
pipes = pipe_geometries()
for pipe in pipes:
self.pipes.append(
self.plot_widget.plot(
x=pipe["x"], y=pipe["y"], pen=pg.mkPen(color=self.color_impenetrable, width=2)
)
)
def plot_walls(self):
walls = wall_geometries()
for wall in walls:
rect = pg.QtWidgets.QGraphicsRectItem(*wall) # pylint: disable=E1101
rect.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
rect.setPen(pg.mkPen(color=self.color_impenetrable, width=2))
self.plot_widget.addItem(rect)
self.walls.append(rect)
def update_curves(self, scene, data):
self.data[scene] = data
plot = self.plots[scene]
plot.setData(x=self.data[scene]["x"], y=self.data[scene]["y"])
@@ -0,0 +1,27 @@
"""
Settings panel for the digital twin widget
"""
# pylint: disable=E0611
from qtpy.QtWidgets import QLayout, QVBoxLayout, QWidget
from debye_bec.bec_widgets.widgets.qt_widgets import Button, Group
class SettingsPanel(QWidget):
"""Right-side control panel: input field, indicator, send, recording."""
def __init__(self, parent=None):
super().__init__(parent)
self._layout = QVBoxLayout(self)
self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
# Reload offsets
self.reload_offsets = Button(label="Reload Offsets", label_button="Reload", enabled=True)
self.unload_offsets = Button(label="Unload Offsets", label_button="Unload", enabled=True)
# Assemble complete offset group
self.offset_group = Group("Axes Offsets", [self.reload_offsets, self.unload_offsets])
self._layout.addWidget(self.offset_group)
self._layout.addStretch()