Feat/widget development #89

Open
appel_c wants to merge 4 commits from feat/widget-development into main
14 changed files with 692 additions and 195 deletions
@@ -178,8 +178,11 @@ class DataViewer(BECWidget, QWidget):
scan_data = self.client.history[-n].metadata["bec"] # type: ignore
scan_number = scan_data["scan_number"]
scan_name = scan_data["scan_name"]
comment = scan_data["metadata"]["user_metadata"]["comment"]
sample_name = scan_data["metadata"]["user_metadata"]["sample_name"]
if "metadata" in scan_data:
comment = scan_data["metadata"]["user_metadata"]["comment"]
sample_name = scan_data["metadata"]["user_metadata"]["sample_name"]
else:
comment, sample_name = "", ""
status = scan_data["status"]
self.history.append(
{
@@ -6,12 +6,12 @@ import numpy as np
from bec_lib import bec_logger
from .. import parameters as bl
from ..types import ConfigDict
from ..types import BeamlineId, ConfigDict
logger = bec_logger.logger
def calc_positions(cfg: ConfigDict) -> dict[str, dict[str, float]]:
def calc_positions(beamline: BeamlineId, cfg: ConfigDict) -> dict[str, dict[str, float]]:
"""
Calculates the positions of axes based on a beamline config.
@@ -150,7 +150,7 @@ def calc_positions(cfg: ConfigDict) -> dict[str, dict[str, float]]:
d = bl.opSlits1.center[1] - bl.cm.center[1] - dz
sl1_beam_height = d * np.tan(2 * cfg["cm_pitch"]) + beam_offset_mo1
pos["sl1_centery"] = {"value": sl1_beam_height}
pos["sl1_gapy"] = {"value": beam_vs + 1} # Add 0.5 mm space on both sides of the beam
pos["sl1_gapy"] = {"value": beam_vs}
## Beam Monitor 1
d = bl.opBM1.center[1] - bl.cm.center[1] - dz
@@ -226,10 +226,11 @@ def calc_positions(cfg: ConfigDict) -> dict[str, dict[str, float]]:
pos["fm_rotz"] = {"value": 0}
## Slits 2
d = bl.opSlits2.center[1] - bl.fm.center[1]
sl2_beam_height = fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"]))
pos["sl2_centery"] = {"value": sl2_beam_height}
pos["sl2_gapy"] = {"value": beam_vs + 1} # Add 0.5 mm space on both sides of the beam
if hasattr(bl, "opSlits2"):
d = bl.opSlits2.center[1] - bl.fm.center[1]
sl2_beam_height = fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"]))
pos["sl2_centery"] = {"value": sl2_beam_height}
pos["sl2_gapy"] = {"value": beam_vs}
## Beam Monitor 2
d = bl.opBM2.center[1] - bl.fm.center[1]
@@ -238,22 +239,59 @@ def calc_positions(cfg: ConfigDict) -> dict[str, dict[str, float]]:
## Optical Table
# TRY
d = bl.ehWindow.center[1] - bl.fm.center[1]
ot_height = fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"]))
pos["ot_try"] = {"value": ot_height}
if beamline == "x01da":
# TRY
d = bl.ehWindow.center[1] - bl.fm.center[1]
ot_height = fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"]))
pos["ot_try"] = {"value": ot_height}
# Pitch
ot_pitch = -(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"])
pos["ot_rotx"] = {"value": ot_pitch * 1e3}
# Pitch
ot_pitch = -(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"])
pos["ot_rotx"] = {"value": ot_pitch * 1e3}
# TRZ ES1
ot_es1_trz = cfg["smpl"]
pos["ot_es1_trz"] = {"value": ot_es1_trz}
# TRZ ES1
ot_es1_trz = cfg["smpl"]
pos["ot_es1_trz"] = {"value": ot_es1_trz}
# ES0 exit window
pos["es0wi_try"] = {
"value": 5
} # At 5mm, the middle of the window is 500 mm from the table (neutral position)
# ES0 exit window
pos["es0wi_try"] = {
"value": 5
} # At 5mm, the middle of the window is 500 mm from the table (neutral position)
else:
# Exit window height
d = bl.ehWindow.center[1] - bl.fm.center[1]
es0wi_try = fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"]))
pos["es0wi_try"] = {"value": es0wi_try}
# ES1 table height
d = bl.es1.center[1] - bl.fm.center[1]
es1_try = fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"]))
pos["es1_try"] = {"value": es1_try}
# IC0 height
d = bl.es1ic0.center[1] - bl.fm.center[1]
es1ic0_try = (
fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"])) - es1_try
)
pos["es1ic0_try"] = {"value": es1ic0_try}
# IC1 height
d = bl.es1ic1.center[1] - bl.fm.center[1]
es1ic1_try = (
fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"])) - es1_try
)
pos["es1ic1_try"] = {"value": es1ic1_try}
# IC2 height
d = bl.es1ic2.center[1] - bl.fm.center[1]
es1ic2_try = (
fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"])) - es1_try
)
pos["es1ic2_try"] = {"value": es1ic2_try}
# ES2 table height
d = bl.es2.center[1] - bl.fm.center[1]
es2_try = fm_beam_height - d * np.tan(-(2 * cfg["cm_pitch"] - 2 * cfg["fm_rotx"]))
pos["es2_try"] = {"value": es2_try}
return pos
@@ -37,7 +37,7 @@ def calc_surfaces(cfg: ConfigDict) -> SurfaceDict:
w1 = 2 * (bl.cm.center[1] - l / 2) * np.tan(cfg["h_acc"])
w2 = 2 * (bl.cm.center[1] + l / 2) * np.tan(cfg["h_acc"])
index = bl.cm.surface.index(cfg["cm_stripe"])
# index = bl.cm.surface.index(cfg["cm_stripe"])
cen = -cfg["cm_trx"]
@@ -96,6 +96,8 @@ def calc_surfaces(cfg: ConfigDict) -> SurfaceDict:
out["mo1_2"]["x"] = []
out["mo1_2"]["y"] = []
if cfg["fm_stripe"] is None:
return out
# Focusing mirror
if cfg["fm_stripe"] in ("Rh (toroid)", "Pt (toroid)"):
surface = bl.fm.surfaceToroid
@@ -85,9 +85,10 @@ def fm_trx_to_stripe(fm_trx: float) -> str | None:
str | None: Stripe of the mirror, None if not found
"""
fm_stripe = None
for name, low, high in zip(bl.fm.surfaceFlat, bl.fm.limOptXFlat[1], bl.fm.limOptXFlat[0]):
if low <= fm_trx <= high:
fm_stripe = name + " (flat)"
if hasattr(bl.fm, "surfaceFlat"):
for name, low, high in zip(bl.fm.surfaceFlat, bl.fm.limOptXFlat[1], bl.fm.limOptXFlat[0]):
if low <= fm_trx <= high:
fm_stripe = name + " (flat)"
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
if low <= fm_trx <= high:
fm_stripe = name + " (toroid)"
@@ -105,9 +106,10 @@ def fm_stripe_to_trx(fm_stripe: str) -> float | None:
Returns:
float | None: TRX value of the stripe. None if not found
"""
for name, low, high in zip(bl.fm.surfaceFlat, bl.fm.limOptXFlat[1], bl.fm.limOptXFlat[0]):
if fm_stripe == name + " (flat)":
return (low + high) / 2
if hasattr(bl.fm, "surfaceFlat"):
for name, low, high in zip(bl.fm.surfaceFlat, bl.fm.limOptXFlat[1], bl.fm.limOptXFlat[0]):
if fm_stripe == name + " (flat)":
return (low + high) / 2
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
if fm_stripe == name + " (toroid)":
return -(low + high) / 2
@@ -165,6 +167,8 @@ def cm_reflectivity(cm_stripe: str, cm_pitch: float, energy: float) -> float:
Returns:
float: Reflectivity [0-1]
"""
if cm_stripe is None:
return np.nan
index = bl.cm.surface.index(cm_stripe)
rs, _ = bl.cm.material[index].get_amplitude(energy, np.sin(cm_pitch))[0:2]
refl = abs(rs) ** 2
@@ -184,6 +188,8 @@ def fm_reflectivity(fm_stripe: str, fm_pitch: float, energy: float) -> float:
Returns:
float: Reflectivity [0-1]
"""
if fm_stripe is None:
return np.nan
if fm_stripe in ("Rh (toroid)", "Pt (toroid)"):
surface = bl.fm.surfaceToroid
material = bl.fm.materialToroid
@@ -391,6 +397,8 @@ def wall_geometries() -> list[list[float]]:
list[list[float]]: List of [x, y, width, height] geometry values for each wall.
"""
geom = []
if not hasattr(bl, "walls"):
return geom
for i, _ in enumerate(bl.walls.start):
geom.append(
[
@@ -413,6 +421,8 @@ def pipe_geometries() -> list[dict[str, np.ndarray]]:
the start and end coordinates of the pipe top and bottom edges.
"""
pipes = []
if not hasattr(bl, "vacuum_pipes"):
return pipes
for i, _ in enumerate(bl.vacuum_pipes.center):
top = bl.vacuum_pipes.center[i] + bl.vacuum_pipes.diameter[i] / 2 + bl.sourceHeight
bottom = bl.vacuum_pipes.center[i] - bl.vacuum_pipes.diameter[i] / 2 + bl.sourceHeight
@@ -439,6 +449,8 @@ def table_to_smpl_pos(table: str) -> float:
table (str): Table name, e.g. ES1 or ES2
"""
if table in bl.tables:
return bl.tables[table]["smpl"]
if table == bl.es1.name:
return bl.es1.center[1]
if table == bl.es2.name:
return bl.es2.center[1]
raise ValueError(f"Table {table} not found in beamline parameter file")
@@ -117,12 +117,12 @@ class DigitalTwin(BECWidget, QWidget):
self.plot_layout.setContentsMargins(4, 4, 4, 4)
self.plot_layout.setSpacing(6)
self.sideview_plot = SideviewPlot()
self.surface_plots = SurfacePlots()
self.surface_plots = SurfacePlots(self.beamline)
self.plot_layout.addWidget(self.sideview_plot, stretch=1)
self.plot_layout.addWidget(self.surface_plots, stretch=1)
self.plot_widget.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.mover = MoverPanel(self.dev)
self.mover = MoverPanel(self.beamline, self.dev)
self.input_scroll = self._scroll_area(self.input_widget, min_width=320, max_width=360)
self.mover_scroll = self._scroll_area(self.mover, min_width=380, max_width=460)
@@ -232,7 +232,6 @@ class DigitalTwin(BECWidget, QWidget):
"fm_roty",
"fm_rotz",
"bm2_try",
"ot_try",
"es0wi_try",
]
if self.beamline == "x01da": # X01DA specific devices
@@ -467,7 +466,14 @@ class DigitalTwin(BECWidget, QWidget):
fm_stripe = fm_trx_to_stripe(-fm_trx)
fm_rotx = self.dev.fm_rotx.read(cached=True)["fm_rotx"]["value"]
fm_rotx_real = 2 * cm_pitch - fm_rotx
smpl = self.dev.ot_es1_trz.read(cached=True)["ot_es1_trz"]["value"]
match self.input.smpl:
case InputNumberField():
smpl = self.dev.ot_es1_trz.read(cached=True)["ot_es1_trz"]["value"]
case ComboBox():
table = self.input.smpl.currentText()
smpl = table_to_smpl_pos(table)
raw = { # Config in SI units!
"energy": mo1_bragg["mo1_bragg"]["value"],
"h_acc": h_acc,
@@ -525,17 +531,34 @@ class DigitalTwin(BECWidget, QWidget):
self.mover.fm_rotx.set_feedback(fm_rotx)
self.mover.fm_roty.set_feedback(self.dev.fm_roty.read(cached=True)["fm_roty"]["value"])
self.mover.fm_rotz.set_feedback(self.dev.fm_rotz.read(cached=True)["fm_rotz"]["value"])
self.mover.sl2_centery.set_feedback(
self.dev.sl2_centery.read(cached=True)["sl2_centery"]["value"]
)
self.mover.sl2_gapy.set_feedback(self.dev.sl2_gapy.read(cached=True)["sl2_gapy"]["value"])
if self.beamline == "x01da":
self.mover.sl2_centery.set_feedback(
self.dev.sl2_centery.read(cached=True)["sl2_centery"]["value"]
)
self.mover.sl2_gapy.set_feedback(
self.dev.sl2_gapy.read(cached=True)["sl2_gapy"]["value"]
)
self.mover.bm2_try.set_feedback(self.dev.bm2_try.read(cached=True)["bm2_try"]["value"])
self.mover.ot_try.set_feedback(self.dev.ot_try.read(cached=True)["ot_try"]["value"])
self.mover.ot_rotx.set_feedback(self.dev.ot_rotx.read(cached=True)["ot_rotx"]["value"])
self.mover.ot_es1_trz.set_feedback(smpl)
if self.beamline == "x01da":
self.mover.ot_try.set_feedback(self.dev.ot_try.read(cached=True)["ot_try"]["value"])
self.mover.ot_rotx.set_feedback(self.dev.ot_rotx.read(cached=True)["ot_rotx"]["value"])
self.mover.ot_es1_trz.set_feedback(smpl)
self.mover.es0wi_try.set_feedback(
self.dev.es0wi_try.read(cached=True)["es0wi_try"]["value"]
)
if self.beamline == "x10da":
self.mover.es1_try.set_feedback(self.dev.es1_try.read(cached=True)["es1_try"]["value"])
self.mover.es1ic0_try.set_feedback(
self.dev.es1ic0_try.read(cached=True)["es1ic0_try"]["value"]
)
self.mover.es1ic1_try.set_feedback(
self.dev.es1ic1_try.read(cached=True)["es1ic1_try"]["value"]
)
self.mover.es1ic2_try.set_feedback(
self.dev.es1ic2_try.read(cached=True)["es1ic2_try"]["value"]
)
self.mover.es2_try.set_feedback(self.dev.es2_try.read(cached=True)["es2_try"]["value"])
self.mover.abs.set_feedback(abs_open)
return config
@@ -553,7 +576,8 @@ class DigitalTwin(BECWidget, QWidget):
pos["mo1_trx"] = self.dev.mo1_trx.read(cached=True)["mo1_trx"]["value"]
pos["fm_trx"] = self.dev.fm_trx.read(cached=True)["fm_trx"]["value"]
pos["fm_rotx"] = self.dev.fm_rotx.read(cached=True)["fm_rotx"]["value"]
pos["ot_es1_trz"] = self.dev.ot_es1_trz.read(cached=True)["ot_es1_trz"]["value"]
if self.beamline == "x01da":
pos["ot_es1_trz"] = self.dev.ot_es1_trz.read(cached=True)["ot_es1_trz"]["value"]
# Removing offsets
for axis, _ in pos.items():
@@ -819,7 +843,7 @@ class DigitalTwin(BECWidget, QWidget):
"""
Calculates the positions for the axes based on the assistant values
"""
out = calc_positions(self.get_assistant_config())
out = calc_positions(self.beamline, self.get_assistant_config())
# Apply offsets
for axis, axis_data in out.items():
@@ -851,13 +875,21 @@ class DigitalTwin(BECWidget, QWidget):
self.mover.fm_rotx.set_target(out["fm_rotx"]["value"])
self.mover.fm_roty.set_target(out["fm_roty"]["value"])
self.mover.fm_rotz.set_target(out["fm_rotz"]["value"])
self.mover.sl2_centery.set_target(out["sl2_centery"]["value"])
self.mover.sl2_gapy.set_target(out["sl2_gapy"]["value"])
if self.beamline == "x01da":
self.mover.sl2_centery.set_target(out["sl2_centery"]["value"])
self.mover.sl2_gapy.set_target(out["sl2_gapy"]["value"])
self.mover.bm2_try.set_target(out["bm2_try"]["value"])
self.mover.ot_try.set_target(out["ot_try"]["value"])
self.mover.ot_rotx.set_target(out["ot_rotx"]["value"])
self.mover.ot_es1_trz.set_target(out["ot_es1_trz"]["value"])
if self.beamline == "x01da":
self.mover.ot_try.set_target(out["ot_try"]["value"])
self.mover.ot_rotx.set_target(out["ot_rotx"]["value"])
self.mover.ot_es1_trz.set_target(out["ot_es1_trz"]["value"])
self.mover.es0wi_try.set_target(out["es0wi_try"]["value"])
if self.beamline == "x10da":
self.mover.es1_try.set_target(out["es1_try"]["value"])
self.mover.es1ic0_try.set_target(out["es1ic0_try"]["value"])
self.mover.es1ic1_try.set_target(out["es1ic1_try"]["value"])
self.mover.es1ic2_try.set_target(out["es1ic2_try"]["value"])
self.mover.es2_try.set_target(out["es2_try"]["value"])
def calc_mo1_bragg_angle(self):
"""
@@ -7,6 +7,7 @@ from typing import Literal
# pylint: disable=E0611
from qtpy.QtWidgets import QVBoxLayout, QWidget
from ..types import BeamlineId
from ..widgets.move_widget import AbsorberWidget, MoveWidget
from ..widgets.qt_widgets import Group
@@ -14,7 +15,7 @@ from ..widgets.qt_widgets import Group
class MoverPanel(QWidget):
""" "Panel to move an axis to a certain position"""
def __init__(self, dev, parent=None):
def __init__(self, beamline: BeamlineId, dev, parent=None):
super().__init__(parent)
self._layout = QVBoxLayout(self)
self._layout.setContentsMargins(4, 4, 4, 4)
@@ -24,12 +25,24 @@ class MoverPanel(QWidget):
# FE Slits
self.sldi_gapx = MoveWidget(
dev=dev, motor="sldi_gapx", label="GAPX", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="sldi_gapx",
label="GAPX",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.sldi_gapx)
self.sldi_gapy = MoveWidget(
dev=dev, motor="sldi_gapy", label="GAPY", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="sldi_gapy",
label="GAPY",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.sldi_gapy)
@@ -42,22 +55,46 @@ class MoverPanel(QWidget):
# Collimating mirror
self.cm_trx = MoveWidget(
dev=dev, motor="cm_trx", label="TRX", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="cm_trx",
label="TRX",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.cm_trx)
self.cm_try = MoveWidget(
dev=dev, motor="cm_try", label="TRY", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="cm_try",
label="TRY",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.cm_try)
self.cm_bnd = MoveWidget(
dev=dev, motor="cm_bnd", label="BENDER", unit="km", decimals=2, deadband=0.2
beamline=beamline,
dev=dev,
motor="cm_bnd",
label="BENDER",
unit="km",
decimals=2,
deadband=0.2,
)
self.mover_widgets.append(self.cm_bnd)
self.cm_rotx = MoveWidget(
dev=dev, motor="cm_rotx", label="PITCH", unit="mrad", decimals=3, deadband=0.01
beamline=beamline,
dev=dev,
motor="cm_rotx",
label="PITCH",
unit="mrad",
decimals=3,
deadband=0.01,
)
self.mover_widgets.append(self.cm_rotx)
@@ -67,6 +104,7 @@ class MoverPanel(QWidget):
# Monochromator
self.mo1_bragg_angle = MoveWidget(
beamline=beamline,
dev=dev,
motor="mo1_bragg_angle",
label="Bragg Angle",
@@ -77,12 +115,24 @@ class MoverPanel(QWidget):
self.mover_widgets.append(self.mo1_bragg_angle)
self.mo1_trx = MoveWidget(
dev=dev, motor="mo1_trx", label="TRX", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="mo1_trx",
label="TRX",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.mo1_trx)
self.mo1_try = MoveWidget(
dev=dev, motor="mo1_try", label="TRY", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="mo1_try",
label="TRY",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.mo1_try)
@@ -92,12 +142,24 @@ class MoverPanel(QWidget):
# OP Slits 1
self.sl1_centery = MoveWidget(
dev=dev, motor="sl1_centery", label="CENTERY", unit="mm", decimals=2, deadband=0.1
beamline=beamline,
dev=dev,
motor="sl1_centery",
label="CENTERY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.sl1_centery)
self.sl1_gapy = MoveWidget(
dev=dev, motor="sl1_gapy", label="GAPY", unit="mm", decimals=2, deadband=0.1
beamline=beamline,
dev=dev,
motor="sl1_gapy",
label="GAPY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.sl1_gapy)
@@ -105,7 +167,13 @@ class MoverPanel(QWidget):
# OP Beam Monitor 1
self.bm1_try = MoveWidget(
dev=dev, motor="bm1_try", label="TRY", unit="mm", decimals=2, deadband=0.1
beamline=beamline,
dev=dev,
motor="bm1_try",
label="TRY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.bm1_try)
@@ -113,32 +181,68 @@ class MoverPanel(QWidget):
# Focusing Mirror
self.fm_trx = MoveWidget(
dev=dev, motor="fm_trx", label="TRX", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="fm_trx",
label="TRX",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.fm_trx)
self.fm_try = MoveWidget(
dev=dev, motor="fm_try", label="TRY", unit="mm", decimals=2, deadband=0.01
beamline=beamline,
dev=dev,
motor="fm_try",
label="TRY",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.fm_try)
self.fm_bnd = MoveWidget(
dev=dev, motor="fm_bnd", label="BENDER", unit="km", decimals=2, deadband=0.2
beamline=beamline,
dev=dev,
motor="fm_bnd",
label="BENDER",
unit="km",
decimals=2,
deadband=0.2,
)
self.mover_widgets.append(self.fm_bnd)
self.fm_rotx = MoveWidget(
dev=dev, motor="fm_rotx", label="PITCH", unit="mrad", decimals=3, deadband=0.01
beamline=beamline,
dev=dev,
motor="fm_rotx",
label="PITCH",
unit="mrad",
decimals=3,
deadband=0.01,
)
self.mover_widgets.append(self.fm_rotx)
self.fm_roty = MoveWidget(
dev=dev, motor="fm_roty", label="YAW", unit="mrad", decimals=3, deadband=0.01
beamline=beamline,
dev=dev,
motor="fm_roty",
label="YAW",
unit="mrad",
decimals=3,
deadband=0.01,
)
self.mover_widgets.append(self.fm_roty)
self.fm_rotz = MoveWidget(
dev=dev, motor="fm_rotz", label="ROLL", unit="mrad", decimals=3, deadband=0.01
beamline=beamline,
dev=dev,
motor="fm_rotz",
label="ROLL",
unit="mrad",
decimals=3,
deadband=0.01,
)
self.mover_widgets.append(self.fm_rotz)
@@ -147,74 +251,202 @@ class MoverPanel(QWidget):
[self.fm_trx, self.fm_try, self.fm_bnd, self.fm_rotx, self.fm_roty, self.fm_rotz],
)
# OP Slits 2
self.sl2_centery = MoveWidget(
dev=dev, motor="sl2_centery", label="CENTERY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.sl2_centery)
if beamline == "x01da":
# OP Slits 2
self.sl2_centery = MoveWidget(
beamline=beamline,
dev=dev,
motor="sl2_centery",
label="CENTERY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.sl2_centery)
self.sl2_gapy = MoveWidget(
dev=dev, motor="sl2_gapy", label="GAPY", unit="mm", decimals=2, deadband=0.1
)
self.mover_widgets.append(self.sl2_gapy)
self.sl2_gapy = MoveWidget(
beamline=beamline,
dev=dev,
motor="sl2_gapy",
label="GAPY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.sl2_gapy)
self.sl2_mov_group = Group("OP Slits 2", [self.sl2_centery, self.sl2_gapy])
self.sl2_mov_group = Group("OP Slits 2", [self.sl2_centery, self.sl2_gapy])
# OP Beam Monitor 2
self.bm2_try = MoveWidget(
dev=dev, motor="bm2_try", label="TRY", unit="mm", decimals=2, deadband=0.1
beamline=beamline,
dev=dev,
motor="bm2_try",
label="TRY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.bm2_try)
self.bm2_mov_group = Group("OP Beam Monitor 2", [self.bm2_try])
# Optical Table
self.ot_try = MoveWidget(
dev=dev, motor="ot_try", label="TRY", unit="mm", decimals=2, deadband=0.2
)
self.mover_widgets.append(self.ot_try)
if beamline == "x01da":
# Optical Table
self.ot_try = MoveWidget(
beamline=beamline,
dev=dev,
motor="ot_try",
label="TRY",
unit="mm",
decimals=2,
deadband=0.2,
)
self.mover_widgets.append(self.ot_try)
self.ot_rotx = MoveWidget(
dev=dev, motor="ot_rotx", label="ROTX", unit="mrad", decimals=3, deadband=0.05
)
self.mover_widgets.append(self.ot_rotx)
self.ot_rotx = MoveWidget(
beamline=beamline,
dev=dev,
motor="ot_rotx",
label="ROTX",
unit="mrad",
decimals=3,
deadband=0.05,
)
self.mover_widgets.append(self.ot_rotx)
self.ot_mov_group = Group("Optical Table", [self.ot_try, self.ot_rotx])
self.ot_mov_group = Group("Optical Table", [self.ot_try, self.ot_rotx])
# Experimental Station 0
self.es0wi_try = MoveWidget(
dev=dev, motor="es0wi_try", label="ES0 WI", unit="mm", decimals=0, deadband=0.1
beamline=beamline,
dev=dev,
motor="es0wi_try",
label="ES0 WI",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.es0wi_try)
self.es0_mov_group = Group("Experimental Station 0", [self.es0wi_try])
# Experimental Station 1
self.ot_es1_trz = MoveWidget(
dev=dev, motor="ot_es1_trz", label="ES1 TRZ", unit="mm", decimals=0, deadband=5
)
self.mover_widgets.append(self.ot_es1_trz)
if beamline == "x01da":
self.ot_es1_trz = MoveWidget(
beamline=beamline,
dev=dev,
motor="ot_es1_trz",
label="ES1 TRZ",
unit="mm",
decimals=0,
deadband=5,
)
self.mover_widgets.append(self.ot_es1_trz)
self.es1_mov_group = Group("Experimental Station 1", [self.ot_es1_trz])
if beamline == "x10da":
self.es1_try = MoveWidget(
beamline=beamline,
dev=dev,
motor="es1_try",
label="ES1 TRY",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.es1_try)
self.es1ic0_try = MoveWidget(
beamline=beamline,
dev=dev,
motor="es1ic0_try",
label="IC0 TRY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.es1ic0_try)
self.es1ic1_try = MoveWidget(
beamline=beamline,
dev=dev,
motor="es1ic1_try",
label="IC1 TRY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.es1ic1_try)
self.es1ic2_try = MoveWidget(
beamline=beamline,
dev=dev,
motor="es1ic2_try",
label="IC2 TRY",
unit="mm",
decimals=2,
deadband=0.1,
)
self.mover_widgets.append(self.es1ic2_try)
if beamline == "x01da":
self.es1_mov_group = Group("Experimental Station 1", [self.ot_es1_trz])
else:
self.es1_mov_group = Group(
"Experimental Station 1", [self.es1_try, self.es1ic1_try, self.es1ic2_try]
)
# Experimental Station 2
if beamline == "x10da":
self.es2_try = MoveWidget(
beamline=beamline,
dev=dev,
motor="es2_try",
label="ES2 TRY",
unit="mm",
decimals=2,
deadband=0.01,
)
self.mover_widgets.append(self.es2_try)
self.es2_mov_group = Group("Experimental Station 2", [self.es2_try])
# Assemble complete mover group
self.mover_group = Group(
"Mover",
[
self.sldi_mov_group,
self.abs_group,
self.cm_mov_group,
self.mo1_mov_group,
self.sl1_mov_group,
self.bm1_mov_group,
self.fm_mov_group,
self.sl2_mov_group,
self.bm2_mov_group,
self.ot_mov_group,
self.es0_mov_group,
self.es1_mov_group,
],
)
if beamline == "x01da":
self.mover_group = Group(
"Mover",
[
self.sldi_mov_group,
self.abs_group,
self.cm_mov_group,
self.mo1_mov_group,
self.sl1_mov_group,
self.bm1_mov_group,
self.fm_mov_group,
self.sl2_mov_group,
self.bm2_mov_group,
self.ot_mov_group,
self.es0_mov_group,
self.es1_mov_group,
],
)
else:
self.mover_group = Group(
"Mover",
[
self.sldi_mov_group,
self.abs_group,
self.cm_mov_group,
self.mo1_mov_group,
self.sl1_mov_group,
self.bm1_mov_group,
self.fm_mov_group,
self.bm2_mov_group,
self.es0_mov_group,
self.es1_mov_group,
self.es2_mov_group,
],
)
self._layout.addWidget(self.mover_group)
self._layout.addStretch()
@@ -21,7 +21,7 @@ from ..calculations.calc_varia import (
pipe_geometries,
wall_geometries,
)
from ..types import DataDict, SurfaceDict
from ..types import BeamlineId, DataDict, SurfaceDict
from ..widgets.qt_widgets import Group
logger = bec_logger.logger
@@ -30,8 +30,9 @@ logger = bec_logger.logger
class SurfacePlots(QWidget):
"""Plot widget with two curves and legend."""
def __init__(self, parent=None):
def __init__(self, beamline: BeamlineId, parent=None):
super().__init__(parent=parent)
self.beamline = beamline
self._layout = QHBoxLayout(self)
self._layout.setContentsMargins(4, 4, 4, 4)
self._layout.setSpacing(6)
@@ -174,7 +175,8 @@ class SurfacePlots(QWidget):
elif name == "mo1_2":
plot_surface(plot["widget"], mo_surface_geometries("mo1", 1))
elif name == "fm":
plot_surface(plot["widget"], mirror_surface_geometries("fm_flat"))
if self.beamline == "x01da":
plot_surface(plot["widget"], mirror_surface_geometries("fm_flat"))
plot_surface(plot["widget"], mirror_surface_geometries("fm_toroid"))
else:
raise ValueError(f"Plot {name} not found!")
@@ -17,6 +17,7 @@ from qtpy.QtWidgets import QApplication, QHBoxLayout, QLabel, QPushButton, QWidg
# pylint: disable=E0402
from .....devices.absorber import STATUS as ABS_STATUS
from ..types import BeamlineId
logger = bec_logger.logger
@@ -132,8 +133,9 @@ class MotionWorker(QObject):
error = Signal()
finished = Signal()
def __init__(self, dev, motor, target_pos: float):
def __init__(self, beamline: BeamlineId, dev, motor, target_pos: float):
super().__init__()
self.beamline = beamline
self.dev = dev
self.motor = motor
self._target = target_pos
@@ -182,16 +184,23 @@ class MotionWorker(QObject):
],
)
case "cm_bnd":
p1 = (
1 / (self.dev.cm_bnd_radius.read()["cm_bnd_radius"]["value"] * 1e3) + 0.0284
) / 2e-6
p2 = (1 / (self._target * 1e3) + 0.0284) / 2e-6
if self.beamline == "x01da":
p1 = (
1 / (self.dev.cm_bnd_radius.read()["cm_bnd_radius"]["value"] * 1e3) + 0.0284
) / 2e-6
p2 = (1 / (self._target * 1e3) + 0.0284) / 2e-6
else:
p1 = 541900 / self.dev.cm_bnd_radius.read()["cm_bnd_radius"]["value"] - 32570
p2 = 541900 / self._target - 32570
self._target = p2 - p1
self.motion(relative=True, rb={"device": self.dev["cm_bnd_radius"]})
case "mo1_try" | "mo1_trx" | "mo1_roty":
self.motion(abs_closed=True)
case "mo1_bragg_angle":
self.motion()
if self.beamline == "x01da":
self.motion()
else: # x10da needs to move goniometer
self.motion(alias="mo1_rotx")
case "sl1_centery" | "sl1_gapy" | "bm1_try":
self.motion()
case "fm_trx":
@@ -204,44 +213,77 @@ class MotionWorker(QObject):
abs_closed=True, surveyed_axes=[{"device": self.dev["fm_trx"], "abs_tol": 0.05}]
)
case "fm_try":
if self.beamline == "x01da":
abs_tol = 0.05
else: # superxas mirror less stable thus needs higher tolerance
abs_tol = 0.2
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["fm_rotx"], "abs_tol": 0.05},
{"device": self.dev["fm_rotz"], "abs_tol": 0.05},
{"device": self.dev["fm_rotx"], "abs_tol": abs_tol},
{"device": self.dev["fm_rotz"], "abs_tol": abs_tol},
],
)
case "fm_rotx":
if self.beamline == "x01da":
abs_tol = 0.05
else: # superxas mirror less stable thus needs higher tolerance
abs_tol = 0.2
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["fm_try"], "abs_tol": 0.05},
{"device": self.dev["fm_rotz"], "abs_tol": 0.05},
{"device": self.dev["fm_try"], "abs_tol": abs_tol},
{"device": self.dev["fm_rotz"], "abs_tol": abs_tol},
],
)
case "fm_rotz":
if self.beamline == "x01da":
abs_tol = 0.05
else: # superxas mirror less stable thus needs higher tolerance
abs_tol = 0.2
self.motion(
abs_closed=True,
surveyed_axes=[
{"device": self.dev["fm_try"], "abs_tol": 0.05},
{"device": self.dev["fm_rotx"], "abs_tol": 0.05},
{"device": self.dev["fm_try"], "abs_tol": abs_tol},
{"device": self.dev["fm_rotx"], "abs_tol": abs_tol},
],
)
case "fm_bnd":
p1 = (
1 / (self.dev.fm_bnd_radius.read()["fm_bnd_radius"]["value"] * 1e3) + 4.28e-5
) / 1.84e-9
p2 = (1 / (self._target * 1e3) + 4.28e-5) / 1.84e-9
if self.beamline == "x01da":
p1 = (
1 / (self.dev.fm_bnd_radius.read()["fm_bnd_radius"]["value"] * 1e3)
+ 4.28e-5
) / 1.84e-9
p2 = (1 / (self._target * 1e3) + 4.28e-5) / 1.84e-9
else:
p1 = (
593088.7 / self.dev.fm_bnd_radius.read()["fm_bnd_radius"]["value"]
+ 26124.41
)
p2 = 593088.7 / self._target + 26124.41
self._target = p2 - p1
self.motion(relative=True, rb={"device": self.dev["fm_bnd_radius"]})
case "sl2_centery" | "sl2_gapy" | "bm2_try":
self.motion()
case "ot_try" | "ot_rotx" | "ot_es1_trz":
self.motion()
case "es0wi_try":
self.motion()
case "es1_try" | "es2_try":
self.motion()
case "es1ic0_try" | "es1ic1_try" | "es1ic2_try":
self.motion()
case _:
logger.warning(f"Motor {self.motor} not integrated in digital twin!")
def motion(self, abs_closed: bool = False, relative: bool = False, rb=None, surveyed_axes=None):
def motion(
self,
abs_closed: bool = False,
relative: bool = False,
rb=None,
surveyed_axes=None,
alias=None,
):
"""
Moves an axis while surverying a set of axes (if set).
Example surveyed_axes:
@@ -250,44 +292,53 @@ class MotionWorker(QObject):
Args:
surveyed_axes (list): List of dictionaries of devices
"""
if abs_closed:
if self.dev.abs.status.get() == ABS_STATUS.OPEN:
status = self.dev.abs.close()
# TODO Set timeout to 0.001 and check if it actually raises
# (it should not start motion).
# Check of behavior of digital twin afterwards.
status.wait(timeout=5)
if surveyed_axes is not None:
for surv_ax in surveyed_axes:
surv_ax["name"] = surv_ax["device"].dotted_name
surv_ax["old_value"] = surv_ax["device"].read(cached=True)[surv_ax["name"]]["value"]
if rb is not None:
rb["name"] = rb["device"].dotted_name
status = self.dev[self.motor].move(self._target, relative=relative)
last_check = time.time()
update_interval = 0.1
while status.status == "RUNNING":
now = time.time()
if time.time() - last_check < update_interval:
time.sleep(0.01)
last_check = now
if self._stop_flag.is_set():
self.dev[self.motor].stop()
self._stop_flag.clear()
if rb is not None:
self.position_changed.emit(rb["device"].read(cached=True)[rb["name"]]["value"])
else:
self.position_changed.emit(
self.dev[self.motor].read(cached=True)[self.motor]["value"]
)
try:
if alias:
self.motor = alias
if abs_closed:
if self.dev.abs.status.get() == ABS_STATUS.OPEN:
status = self.dev.abs.close()
# TODO Set timeout to 0.001 and check if it actually raises
# (it should not start motion).
# Check of behavior of digital twin afterwards.
status.wait(timeout=5)
if surveyed_axes is not None:
for surv_ax in surveyed_axes:
fb = surv_ax["device"].read(cached=True)[surv_ax["name"]]["value"]
if abs(fb - surv_ax["old_value"]) > surv_ax["abs_tol"]:
self.dev[self.motor].stop()
self.error.emit()
break
self.finished.emit()
surv_ax["name"] = surv_ax["device"].dotted_name
surv_ax["old_value"] = surv_ax["device"].read(cached=True)[surv_ax["name"]][
"value"
]
if rb is not None:
rb["name"] = rb["device"].dotted_name
status = self.dev[self.motor].move(self._target, relative=relative)
last_check = time.time()
update_interval = 0.1
while status.status == "RUNNING":
now = time.time()
if time.time() - last_check < update_interval:
time.sleep(0.01)
last_check = now
if self._stop_flag.is_set():
self.dev[self.motor].stop()
self._stop_flag.clear()
if rb is not None:
self.position_changed.emit(rb["device"].read(cached=True)[rb["name"]]["value"])
else:
self.position_changed.emit(
self.dev[self.motor].read(cached=True)[self.motor]["value"]
)
if surveyed_axes is not None:
for surv_ax in surveyed_axes:
fb = surv_ax["device"].read(cached=True)[surv_ax["name"]]["value"]
if abs(fb - surv_ax["old_value"]) > surv_ax["abs_tol"]:
self.dev[self.motor].stop()
self.error.emit()
self.finished.emit()
break
self.finished.emit()
except:
self.error.emit()
self.finished.emit()
class MoveWidget(QWidget):
@@ -299,10 +350,13 @@ class MoveWidget(QWidget):
- Start / Stop button
"""
def __init__(self, dev, motor, label: str = "", unit=None, decimals=3, deadband=0.0):
def __init__(
self, beamline: BeamlineId, dev, motor, label: str = "", unit=None, decimals=3, deadband=0.0
):
super().__init__()
self.fb = 0.0
self.target = 0
self.beamline = beamline
self.dev = dev
self.motor = motor
self.deadband = deadband
@@ -449,7 +503,7 @@ class MoveWidget(QWidget):
self._set_status(Status.MOVING)
self._apply_button_style("stop")
self._worker = MotionWorker(self.dev, self.motor, target)
self._worker = MotionWorker(self.beamline, self.dev, self.motor, target)
self._thread = QThread()
self._worker.moveToThread(self._thread)
@@ -0,0 +1,59 @@
cm_try:
offset: -0.7
mo1_try:
offset: -31.42
mo1_trx:
modifier:
axis: mo1_trx
range: [[-30, -0.1], [0.1, 30]]
offset: [-4.3, 0]
sl1_centery:
offset: -55.54
bm1_try:
offset: 52.22
fm_trx:
modifier:
axis: fm_trx
range: [[-100, -48], [-47, 0]]
offset: [-0.3, 0.52]
fm_try:
modifier:
axis: fm_trx
range: [[-100, -48], [-47, 0]]
offset: [-42.56, -41.49]
# pitch
fm_rotx:
modifier:
axis: fm_trx
range: [[-100, -48], [-47, 0]]
offset: [1.30, 1.049]
# yaw
fm_roty:
modifier:
axis: fm_trx
range: [[-100, -48], [-47, 0]]
offset: [1.754, 1.924]
bm2_try:
offset: -19
es0wi_try:
offset: -71.98
es1_try:
offset: -113.26
es1ic1_try:
offset: 10.39
es1ic2_try:
offset: 3.55
@@ -67,9 +67,16 @@ sls2_35t = bendingMagnet(name="FE-BM-SLS2-3.5T", center=(0, 0, 0), sync=sls2, B0
sls2_50t = bendingMagnet(name="FE-BM-SLS2-5.0T", center=(0, 0, 0), sync=sls2, B0=5.0)
# FE slits
slits = namedtuple("slits", ["name", "center", "maxDivH", "maxDivV"])
fe_slits = namedtuple("slits", ["name", "center", "center1", "center2", "maxDivH", "maxDivV"])
feSlits = slits(name="FE-SLITS", center=(0, 5290, sourceHeight), maxDivH=1.8e-3, maxDivV=0.8e-3)
feSlits = fe_slits(
name="FE-SLITS",
center=(0, 6117, sourceHeight),
center1=(0, 5038.4, sourceHeight),
center2=(0, 5282.9, sourceHeight),
maxDivH=1.8e-3,
maxDivV=0.8e-3,
)
# Filters
filt = namedtuple(
@@ -87,7 +94,7 @@ feWindow = filt(
thickness=0.1,
)
feWindow = feWindow._replace(
surface="CVD Diamond window {0:0.0f} $\mu$m".format(feWindow.thickness * 1e3)
surface="CVD Diamond window {0:0.0f} $\\mu$m".format(feWindow.thickness * 1e3)
)
feFilt = filt(
@@ -100,7 +107,7 @@ feFilt = filt(
material=filterGraphite,
thickness=0.25,
)
feFilt = feFilt._replace(surface="Graphite filter {0:0.0f} $\mu$m".format(feFilt.thickness * 1e3))
feFilt = feFilt._replace(surface="Graphite filter {0:0.0f} $\\mu$m".format(feFilt.thickness * 1e3))
# Collimating mirror
collimatingMirror = namedtuple(
@@ -126,12 +133,12 @@ collimatingMirror = namedtuple(
cm = collimatingMirror(
name="FE-CM",
center=[0, 7618, sourceHeight],
surface=("Si", "Pt", "Rh"),
material=(stripeSi, stripePt, stripeRh),
center=[0, 7560.8, sourceHeight],
surface=("Pt", "Si", "Rh"),
material=(stripePt, stripeSi, stripeRh),
limPhysX=(-30, 30),
limPhysY=(-600, 600),
limOptX=((-21, -8, 5), (-11, 2, 21)),
limOptX=((-21, -0.5, 11), (-4, 9.5, 23)),
limOptY=((-500, -500, -500), (500, 500, 500)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
@@ -152,10 +159,17 @@ opWbBsBlock = apertures(
name="OP-WB-BS-BLOCK", center=[0.0, 13606 - 135, sourceHeight], opening=[-18.0, 18.0, 42, 76]
) # left, right, bottom, top
opSlits = apertures(
name="OP-SLITS", center=[0, 14145 - 135, sourceHeight], opening=[-35 / 2, 35 / 2, 47.5, 82.5]
opSlits1 = apertures(
name="OP-SLITS 1", center=[0, 14145 - 135, sourceHeight], opening=[-35 / 2, 35 / 2, 47.5, 82.5]
)
# OP Beam Monitors
op_bm = namedtuple("op_bm", ["name", "center"])
opBM1 = op_bm(name="OP Beam Monitor 1", center=(0, 14525 - 135, sourceHeight))
opBM2 = op_bm(name="OP Beam Monitor 2", center=(0, 17161.6 - 135, sourceHeight))
# Monochromator
monochromator = namedtuple(
"monochromator",
@@ -187,7 +201,7 @@ mo1 = monochromator(
material1=(si311_1, si111_1),
material2=(si311_2, si111_2),
xtalWidth=(20, 20),
xtalOffsetX=(-19.2, 19.2),
xtalOffsetX=(19.2, -19.2),
xtalLength1=(60, 60),
xtalLength2=(60, 60),
xtalGap=(8, 8),
@@ -225,19 +239,24 @@ focusingMirror = namedtuple(
],
)
OFFSET_TRX = 46.8735
fm = focusingMirror(
name="OP-FM",
center=[0.0, 15580 - 135, sourceHeight],
surfaceToroid=("Rh (toroid)", "Pt (toroid)"),
surfaceToroid=("Rh", "Pt"),
materialToroid=(stripeRh, stripePt),
limPhysXToroid=(-54.0, 54.0),
limPhysYToroid=(-565.0, 565.0),
limOptXToroid=((4.865, -40.882), (43.388, -4.865)),
limOptXToroid=(
(43.388 + OFFSET_TRX, -4.865 + OFFSET_TRX),
(4.865 + OFFSET_TRX, -40.882 + OFFSET_TRX),
),
limOptYToroid=((-500.0, -500.0), (500.0, 500.0)),
R=[3e6, 15e6],
pitch=[1.4e-3, 4.5e-3],
r=[30, 20],
xToroid=[24.126, -22, 874], # offset in local x
xToroid=[24.126 + OFFSET_TRX, -22 + OFFSET_TRX], # offset in local x
hToroid=[7.0, 11.3], # depth of the cylinder at x = xCylinder1 and x = xCylinder2.
jack1=[0.0, 14980.0, 0.0],
jack2=[-75.0, 16180.0, 0.0],
@@ -246,9 +265,12 @@ fm = focusingMirror(
tx2=[0.0, 575.0],
) # X-Stage 2 [x, y]
# Entry wall experimental hutch: 21593 mm from source (SLS2)
# Exit window
ehWindow = filt(
name="EH-WINDOW",
center=(0.0, 22225 - 135, sourceHeight),
center=(0.0, 22063, sourceHeight),
pitch=np.pi / 2,
limPhysX=(-10.0, 10.0),
limPhysY=(17.5, 92.5),
@@ -257,13 +279,18 @@ ehWindow = filt(
thickness=0.25,
)
ehWindow = ehWindow._replace(
surface="Beryllium window {0:0.0f} $\mu$m".format(ehWindow.thickness * 1e3)
surface="Beryllium window {0:0.0f} $\\mu$m".format(ehWindow.thickness * 1e3)
)
# Sample
sample = namedtuple("sample", ["name", "center"])
smpl = sample(name="OP-SMPL", center=[0, 24000 - 135, sourceHeight])
es1 = sample(name="ES1", center=[0, 23823, sourceHeight])
es2 = sample(name="ES2", center=[0, 25843, sourceHeight])
ES1 = sample(name="EH-ES1", center=[0, 24000, sourceHeight])
ES2 = sample(name="EH-ES2", center=[0, 25000, sourceHeight])
# Ionization chambers
ic = namedtuple("sample", ["name", "center"])
es1ic0 = ic(name="ES1 IC0", center=[0, 23633, sourceHeight])
es1ic1 = ic(name="ES1 IC1", center=[0, 24383, sourceHeight])
es1ic2 = ic(name="ES1 IC2", center=[0, 24723, sourceHeight])
+14 -2
View File
@@ -13,9 +13,11 @@ from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class AbsorberError(Exception):
"""Absorber specific exception"""
class STATUS(int, enum.Enum):
"""Absorber States"""
@@ -35,14 +37,24 @@ class STATUS(int, enum.Enum):
MAN_OPEN = 13
UNDEFINED = 14
class Absorber(PSIDeviceBase):
"""Class for the Frontend Absorber"""
USER_ACCESS = ["open", "close"]
request = Cpt(EpicsSignal, suffix="REQUEST", kind="config", doc="Open/Close Absorber")
status = Cpt(EpicsSignalRO, suffix="STATUS", kind="normal", doc="Absorber Status")
status_string = Cpt(EpicsSignalRO, suffix="STATUS", kind="normal", string=True, doc="Absorber Status")
status = Cpt(
EpicsSignalRO, suffix="STATUS", kind="normal", auto_monitor=True, doc="Absorber Status"
)
status_string = Cpt(
EpicsSignalRO,
suffix="STATUS",
kind="normal",
auto_monitor=True,
string=True,
doc="Absorber Status",
)
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
+17 -3
View File
@@ -232,7 +232,7 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
"break_enable_high", None
)
break_time_low = self.scan_parameters.additional_scan_parameters.get(
"break_time_low", None
"break_self.scatime_low", None
)
break_time_high = self.scan_parameters.additional_scan_parameters.get(
"break_time_high", None
@@ -335,9 +335,16 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
time_started = time.time()
logger.info(f"Calling complete on {self.name}.")
status = CompareStatus(self.scan_control.scan_done, 1)
status.add_callback(self._status_callback)
self.cancel_on_stop(status)
logger.info(f"Finished calling complete on {self.name} within {time.time()-time_started}s.")
return status
def _status_callback(self, status, **kwargs):
logger.info(f"Complete finished on mo1bragg with {status.done} and {status.success}")
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
@@ -369,14 +376,21 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
if scan_parameters.scan_name in self.valid_scan_names:
return True
return False
def _progress_update(self, value, **kwargs) -> None:
def _progress_update(self, value, old_value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback
to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
if self.scan_info.msg is None:
return
if self.scan_info.msg.status != "open":
if old_value == value:
return
# TODO check if logic is true
self.progress_signal.put(value=100, max_value=100, done=True)
max_value = 100
self.progress_signal.put(value=value, max_value=max_value, done=bool(max_value == value))
+4
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
@@ -485,6 +486,8 @@ class Nidaq(PSIDeviceBase, NidaqControl):
For the NIDAQ we use this method to stop the backend since it
would not stop by itself in its current implementation since the number of points are not predefined.
"""
time_started = time.time()
logger.info(f"Calling complete on {self.name}.")
if not self._check_if_scan_name_is_valid(self.scan_parameters):
return None
@@ -492,6 +495,7 @@ class Nidaq(PSIDeviceBase, NidaqControl):
self.cancel_on_stop(status)
if self.scan_parameters.scan_name != "nidaq_continuous_scan":
self.on_stop()
logger.info(f"Finished calling complete on {self.name} within {time.time()-time_started}s.")
return status
def _progress_update(self, value, **kwargs) -> None:
+7 -1
View File
@@ -261,6 +261,7 @@ class Pilatus(PSIDeviceBase, ADBase):
# self._live_mode_stopped_event = threading.Event()
# self._live_mode_stopped_event.set() # Initial state is stopped
self.scan_parameters: ScanServerScanInfo = None
self._unique_array_id : int = 0
########################################
# Custom Beamline Methods #
@@ -271,6 +272,11 @@ class Pilatus(PSIDeviceBase, ADBase):
while not self._poll_thread_kill_event.wait(1 / self._poll_rate):
try:
# logger.info(f"Running poll loop for {self.name}..")
array_id = self.image1.unique_id.get()
if array_id != self._unique_array_id:
self._unique_array_id = array_id
else:
continue
value = self.image1.array_data.get()
if value is None:
continue
@@ -288,7 +294,7 @@ class Pilatus(PSIDeviceBase, ADBase):
# )
# continue
logger.info(f"Setting preview data for {self.name}")
# logger.info(f"Setting preview data for {self.name}")
self.preview.put(data)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()