refactoring
CI for debye_bec / test (push) Successful in 1m11s

This commit is contained in:
x01da
2026-05-19 06:44:04 +02:00
parent 6b5ff49b04
commit 62582da1d9
10 changed files with 513 additions and 261 deletions
@@ -1,42 +1,69 @@
import numpy as np
import debye_bec.bec_widgets.widgets.x01da_parameters as bl
from debye_bec.bec_widgets.widgets.digital_twin.types import DataDict
def calc_sideview(cfg):
# Calculate height of beam after CM
height = 2 * bl.cm.center[1] * np.tan(cfg['v_acc'])
# height = 2 * bl.cm.center[1] * np.tan(cfg["v_acc"])
# beam height (Y=height, Z=along beam)
beam = {}
beam['x'] = []
beam['y'] = []
beam['x'].append(0) # Source
beam['y'].append(bl.sourceHeight)
beam['x'].append(bl.cm.center[1]) # CM
beam['y'].append(bl.sourceHeight)
if cfg['mo1_mode'] in 'Monochromatic':
diag = bl.mo1.xtalGap[0]/np.sin(cfg['mo1_bragg']) # Calculations for Mono
dy = diag*np.sin(2*(cfg['cm_pitch']+cfg['mo1_bragg']))
dz = diag*np.cos(2*(cfg['cm_pitch']+cfg['mo1_bragg']))
beam['x'].append(bl.mo1.center[1]-dz/2) # Mono 1.1
beam['y'].append(bl.sourceHeight+np.tan(2*cfg['cm_pitch'])*(bl.mo1.center[1]-dz/2-bl.cm.center[1]))
beam['x'].append(bl.mo1.center[1]+dz/2) # Mono 1.2
beam['y'].append(bl.sourceHeight+np.tan(2*cfg['cm_pitch'])*(bl.mo1.center[1]-dz/2-bl.cm.center[1])+dy)
beam['x'].append(bl.fm.center[1]) # FM
beam['y'].append(bl.sourceHeight+np.tan(2*cfg['cm_pitch'])*(bl.fm.center[1]-bl.cm.center[1]-dz)+dy)
beam['x'].append(cfg['smpl']) # Experiment
beam['y'].append(bl.sourceHeight+np.tan(2*cfg['cm_pitch'])*(bl.fm.center[1]-bl.cm.center[1]-dz)+dy+np.tan(2*(cfg['cm_pitch']-cfg['fm_rotx']))*(cfg['smpl']-bl.fm.center[1]))
elif cfg['mo1_mode'] == 'Pinkbeam':
beam['x'].append(bl.fm.center[1]) # FM
beam['y'].append(bl.sourceHeight+np.tan(2*cfg['cm_pitch'])*(bl.fm.center[1]-bl.cm.center[1]))
beam['x'].append(cfg['smpl']) # Experiment
beam['y'].append(bl.sourceHeight+np.tan(2*cfg['cm_pitch'])*(bl.fm.center[1]-bl.cm.center[1])+np.tan(2*(cfg['cm_pitch']-cfg['fm_rotx']))*(cfg['smpl']-bl.fm.center[1]))
dy_fm_ex = beam['y'][-1] - beam['y'][-2]
dz_fm_ex = beam['x'][-1] - beam['x'][-2]
dz_fm_win = bl.ehWindow.center[1] - beam['x'][-2]
h_at_win = beam['y'][-2] + dy_fm_ex / dz_fm_ex * dz_fm_win
beam: DataDict = {"x": [], "y": []}
beam['heightWindow'] = h_at_win
beam["x"] = []
beam["y"] = []
beam["x"].append(0) # Source
beam["y"].append(bl.sourceHeight)
beam["x"].append(bl.cm.center[1]) # CM
beam["y"].append(bl.sourceHeight)
if cfg["mo1_mode"] in "Monochromatic":
diag = bl.mo1.xtalGap[0] / np.sin(cfg["mo1_bragg"]) # Calculations for Mono
dy = diag * np.sin(2 * (cfg["cm_pitch"] + cfg["mo1_bragg"]))
dz = diag * np.cos(2 * (cfg["cm_pitch"] + cfg["mo1_bragg"]))
beam["x"].append(bl.mo1.center[1] - dz / 2) # Mono 1.1
beam["y"].append(
bl.sourceHeight
+ np.tan(2 * cfg["cm_pitch"]) * (bl.mo1.center[1] - dz / 2 - bl.cm.center[1])
)
beam["x"].append(bl.mo1.center[1] + dz / 2) # Mono 1.2
beam["y"].append(
bl.sourceHeight
+ np.tan(2 * cfg["cm_pitch"]) * (bl.mo1.center[1] - dz / 2 - bl.cm.center[1])
+ dy
)
beam["x"].append(bl.fm.center[1]) # FM
beam["y"].append(
bl.sourceHeight
+ np.tan(2 * cfg["cm_pitch"]) * (bl.fm.center[1] - bl.cm.center[1] - dz)
+ dy
)
beam["x"].append(cfg["smpl"]) # Experiment
beam["y"].append(
bl.sourceHeight
+ np.tan(2 * cfg["cm_pitch"]) * (bl.fm.center[1] - bl.cm.center[1] - dz)
+ dy
+ np.tan(2 * (cfg["cm_pitch"] - cfg["fm_rotx"])) * (cfg["smpl"] - bl.fm.center[1])
)
elif cfg["mo1_mode"] == "Pinkbeam":
beam["x"].append(bl.fm.center[1]) # FM
beam["y"].append(
bl.sourceHeight + np.tan(2 * cfg["cm_pitch"]) * (bl.fm.center[1] - bl.cm.center[1])
)
beam["x"].append(cfg["smpl"]) # Experiment
beam["y"].append(
bl.sourceHeight
+ np.tan(2 * cfg["cm_pitch"]) * (bl.fm.center[1] - bl.cm.center[1])
+ np.tan(2 * (cfg["cm_pitch"] - cfg["fm_rotx"])) * (cfg["smpl"] - bl.fm.center[1])
)
dy_fm_ex = beam["y"][-1] - beam["y"][-2]
dz_fm_ex = beam["x"][-1] - beam["x"][-2]
dz_fm_win = bl.ehWindow.center[1] - beam["x"][-2]
h_at_win = beam["y"][-2] + dy_fm_ex / dz_fm_ex * dz_fm_win
# beam["heightWindow"] = h_at_win
return beam
@@ -1,5 +1,6 @@
import os
import re
import numpy as np
from bec_lib import bec_logger
@@ -7,87 +8,107 @@ logger = bec_logger.logger
os.environ["USE_XRT"] = "False"
import debye_bec.bec_widgets.widgets.x01da_parameters as bl
from debye_bec.bec_widgets.widgets.digital_twin.types import SurfaceDict
def calc_surfaces(cfg):
out = {
'cm': {'x': [], 'y': []},
'mo1_1': {'x': [], 'y': []},
'mo1_2': {'x': [], 'y': []},
'fm': {'x': [], 'y': []},
out: SurfaceDict = {
"cm": {"x": [], "y": []},
"mo1_1": {"x": [], "y": []},
"mo1_2": {"x": [], "y": []},
"fm": {"x": [], "y": []},
}
# Collimating mirror
l = 2 * bl.cm.center[1] * np.tan(cfg['v_acc'])/np.sin(cfg['cm_pitch'])
l = 2 * bl.cm.center[1] * np.tan(cfg["v_acc"]) / np.sin(cfg["cm_pitch"])
w1 = 2 * (bl.cm.center[1]-l/2) * np.tan(cfg['h_acc'])
w2 = 2 * (bl.cm.center[1]+l/2) * np.tan(cfg['h_acc'])
w1 = 2 * (bl.cm.center[1] - l / 2) * np.tan(cfg["h_acc"])
w2 = 2 * (bl.cm.center[1] + l / 2) * np.tan(cfg["h_acc"])
index = bl.cm.surface.index(cfg['cm_stripe'])
cen = (bl.cm.limOptX[0][index] + bl.cm.limOptX[1][index]) / 2
index = bl.cm.surface.index(cfg["cm_stripe"])
if cfg['cm_trx'] is not None:
cen = cfg['cm_trx']
out['cm']['x'] = [cen-w1/2, cen-w2/2, cen+w2/2, cen+w1/2]
out['cm']['y'] = [-l/2, l/2, l/2, -l/2]
cen = -cfg["cm_trx"]
out["cm"]["x"] = [cen - w1 / 2, cen - w2 / 2, cen + w2 / 2, cen + w1 / 2]
out["cm"]["y"] = [-l / 2, l / 2, l / 2, -l / 2]
# Monochromator
# calculate height of center of first crystal surface
c = bl.mo1.heightOffset*1/np.sin(cfg['mo1_bragg'])-bl.mo1.rotOffset*1/np.tan(cfg['mo1_bragg'])
e = bl.mo1.xtalGap[0]/np.tan(cfg['mo1_bragg'])-c
c = bl.mo1.heightOffset * 1 / np.sin(cfg["mo1_bragg"]) - bl.mo1.rotOffset * 1 / np.tan(
cfg["mo1_bragg"]
)
e = bl.mo1.xtalGap[0] / np.tan(cfg["mo1_bragg"]) - c
xtal = cfg['mo1_xtal'].translate(str.maketrans('', '', '()')) # Remove brackets from xtal name to conform with parameters
xtal = cfg["mo1_xtal"].translate(
str.maketrans("", "", "()")
) # Remove brackets from xtal name to conform with parameters
index = bl.mo1.xtal.index(xtal)
xtalPos = bl.mo1.xtalOffsetX[index]
xtalLength1 = bl.mo1.xtalLength1[index]
xtalLength2 = bl.mo1.xtalLength2[index]
widthBeam = 2 * bl.mo1.center[1] * np.tan(cfg['h_acc'])
heightBeam = 2 * bl.cm.center[1] * np.tan(cfg['v_acc'])
w = heightBeam / np.sin(cfg['mo1_bragg'])
widthBeam = 2 * bl.mo1.center[1] * np.tan(cfg["h_acc"])
if cfg['mo1_mode'] in 'Monochromatic':
out['mo1_1']['x'] = [xtalPos-widthBeam/2, xtalPos+widthBeam/2, xtalPos+widthBeam/2, xtalPos-widthBeam/2]
out['mo1_1']['y'] = [xtalLength1/2-c-w/2, xtalLength1/2-c-w/2, xtalLength1/2-c+w/2, xtalLength1/2-c+w/2]
out['mo1_2']['x'] = [xtalPos-widthBeam/2, xtalPos+widthBeam/2, xtalPos+widthBeam/2, xtalPos-widthBeam/2]
out['mo1_2']['y'] = [-xtalLength2/2+e-w/2, -xtalLength2/2+e-w/2, -xtalLength2/2+e+w/2, -xtalLength2/2+e+w/2]
else: # Pinkbeam
out['mo1_1']['x'] = []
out['mo1_1']['y'] = []
out['mo1_2']['x'] = []
out['mo1_2']['y'] = []
heightBeam = 2 * bl.cm.center[1] * np.tan(cfg["v_acc"])
w = heightBeam / np.sin(cfg["mo1_bragg"])
if cfg["mo1_mode"] in "Monochromatic":
out["mo1_1"]["x"] = [
xtalPos - widthBeam / 2,
xtalPos + widthBeam / 2,
xtalPos + widthBeam / 2,
xtalPos - widthBeam / 2,
]
out["mo1_1"]["y"] = [
xtalLength1 / 2 - c - w / 2,
xtalLength1 / 2 - c - w / 2,
xtalLength1 / 2 - c + w / 2,
xtalLength1 / 2 - c + w / 2,
]
out["mo1_2"]["x"] = [
xtalPos - widthBeam / 2,
xtalPos + widthBeam / 2,
xtalPos + widthBeam / 2,
xtalPos - widthBeam / 2,
]
out["mo1_2"]["y"] = [
-xtalLength2 / 2 + e - w / 2,
-xtalLength2 / 2 + e - w / 2,
-xtalLength2 / 2 + e + w / 2,
-xtalLength2 / 2 + e + w / 2,
]
else: # Pinkbeam
out["mo1_1"]["x"] = []
out["mo1_1"]["y"] = []
out["mo1_2"]["x"] = []
out["mo1_2"]["y"] = []
# Focusing mirror
if cfg['fm_stripe'] in ('Rh (toroid)', 'Pt (toroid)'):
if cfg["fm_stripe"] in ("Rh (toroid)", "Pt (toroid)"):
surface = bl.fm.surfaceToroid
stripe = re.sub(r'\s*\(.*?\)', '', cfg['fm_stripe']).strip()
stripe = re.sub(r"\s*\(.*?\)", "", cfg["fm_stripe"]).strip()
index = surface.index(stripe)
off = (bl.fm.limOptXToroid[0][index] + bl.fm.limOptXToroid[1][index]) / 2
r = bl.fm.r[index]
else:
surface = bl.fm.surfaceFlat
stripe = re.sub(r'\s*\(.*?\)', '', cfg['fm_stripe']).strip()
stripe = re.sub(r"\s*\(.*?\)", "", cfg["fm_stripe"]).strip()
index = surface.index(stripe)
off = (bl.fm.limOptXFlat[0][index] + bl.fm.limOptXFlat[1][index]) / 2
r = bl.fm.r[index]
if cfg['fm_trx'] is not None:
off = cfg['fm_trx']
off = -cfg["fm_trx"]
widthBeam = 2 * bl.fm.center[1] * np.tan(cfg['h_acc'])
widthBeam = 2 * bl.fm.center[1] * np.tan(cfg["h_acc"])
if cfg['fm_stripe'] in ('Rh (toroid)', 'Pt (toroid)'):
if cfg["fm_stripe"] in ("Rh (toroid)", "Pt (toroid)"):
l = heightBeam/np.sin(cfg['fm_rotx'])
alpha = np.arccos(1-widthBeam**2/(2*r**2))
h = r-(r*np.cos(alpha/2))
z = h/np.tan(cfg['fm_rotx'])
l = heightBeam / np.sin(cfg["fm_rotx"])
alpha = np.arccos(1 - widthBeam**2 / (2 * r**2))
h = r - (r * np.cos(alpha / 2))
z = h / np.tan(cfg["fm_rotx"])
x = [off-widthBeam/2, off-widthBeam/2]
y = [l/2-z/2, -l/2-z/2]
x = [off - widthBeam / 2, off - widthBeam / 2]
y = [l / 2 - z / 2, -l / 2 - z / 2]
# logger.info(f'stripe: {cfg["fm_stripe"]}')
# logger.info(f'fm_rotx: {cfg["fm_rotx"]}')
@@ -98,34 +119,34 @@ def calc_surfaces(cfg):
res = 20
xElipse = np.linspace(0, np.pi, res)
yElipse = np.linspace(0, np.pi, res)
xElipse = [-widthBeam/2*np.cos(i)+off for i in xElipse]
yElipse = [widthBeam*np.sin(i)*z/widthBeam-l/2-z/2 for i in yElipse]
xElipse = [-widthBeam / 2 * np.cos(i) + off for i in xElipse]
yElipse = [widthBeam * np.sin(i) * z / widthBeam - l / 2 - z / 2 for i in yElipse]
x.extend(xElipse)
y.extend(yElipse)
x.extend([off+widthBeam/2, off+widthBeam/2])
y.extend([-l/2-z/2, l/2-z/2])
x.extend([off + widthBeam / 2, off + widthBeam / 2])
y.extend([-l / 2 - z / 2, l / 2 - z / 2])
res = 50
xElipse = np.linspace(np.pi, 0, res)
yElipse = np.linspace(np.pi, 0, res)
xElipse = [-widthBeam/2*np.cos(i)+off for i in xElipse]
yElipse = [widthBeam*np.sin(i)*z/widthBeam+l/2-z/2 for i in yElipse]
xElipse = [-widthBeam / 2 * np.cos(i) + off for i in xElipse]
yElipse = [widthBeam * np.sin(i) * z / widthBeam + l / 2 - z / 2 for i in yElipse]
x.extend(xElipse)
y.extend(yElipse)
out['fm']['x'] = x
out['fm']['y'] = y
out["fm"]["x"] = x
out["fm"]["y"] = y
else: # flat surface, no toroid
l = heightBeam/np.sin(cfg['fm_rotx'])
else: # flat surface, no toroid
l = heightBeam / np.sin(cfg["fm_rotx"])
w1 = 2 * (bl.fm.center[1]-l/2) * np.tan(cfg['h_acc'])
w2 = 2 * (bl.fm.center[1]+l/2) * np.tan(cfg['h_acc'])
w1 = 2 * (bl.fm.center[1] - l / 2) * np.tan(cfg["h_acc"])
w2 = 2 * (bl.fm.center[1] + l / 2) * np.tan(cfg["h_acc"])
out['fm']['x'] = [off-w1/2, off+w1/2, off+w2/2, off-w2/2]
out['fm']['y'] = [-l/2, -l/2, l/2, l/2]
out["fm"]["x"] = [off - w1 / 2, off + w1 / 2, off + w2 / 2, off - w2 / 2]
out["fm"]["y"] = [-l / 2, -l / 2, l / 2, l / 2]
return out
@@ -1,13 +1,15 @@
import re
import numpy as np
from scipy.interpolate import UnivariateSpline
from xrt.backends.raycing.physconsts import CHeVcm, AVOGADRO
from bec_lib import bec_logger
from scipy.interpolate import UnivariateSpline
from xrt.backends.raycing.physconsts import AVOGADRO, CHeVcm
import debye_bec.bec_widgets.widgets.x01da_parameters as bl
logger = bec_logger.logger
def sldi_gap_to_acc(sldi_gapx, sldi_gapy):
d1 = bl.feSlits.center1[1]
d2 = bl.feSlits.center2[1]
@@ -18,6 +20,7 @@ def sldi_gap_to_acc(sldi_gapx, sldi_gapy):
# v_acc = np.tan(sldi_gapy / (2 * d1))
return h_acc, v_acc
def cm_trx_to_stripe(cm_trx):
cm_stripe = None
for name, low, high in zip(bl.cm.surface, bl.cm.limOptX[0], bl.cm.limOptX[1]):
@@ -25,28 +28,47 @@ def cm_trx_to_stripe(cm_trx):
cm_stripe = name
return cm_stripe
def cm_stripe_to_trx(cm_stripe):
for name, low, high in zip(bl.cm.surface, bl.cm.limOptX[0], bl.cm.limOptX[1]):
if cm_stripe == name:
return -(low + high) / 2
return 0
def fm_trx_to_stripe(fm_trx):
fm_stripe = None
for name, low, high in zip(bl.fm.surfaceFlat, bl.fm.limOptXFlat[1], bl.fm.limOptXFlat[0]):
if low <= fm_trx <= high:
fm_stripe = name + ' (flat)'
fm_stripe = name + " (flat)"
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
if low <= fm_trx <= high:
fm_stripe = name + ' (toroid)'
fm_stripe = name + " (toroid)"
return fm_stripe
def fm_stripe_to_trx(fm_stripe):
for name, low, high in zip(bl.fm.surfaceFlat, bl.fm.limOptXFlat[1], bl.fm.limOptXFlat[0]):
if fm_stripe == name + " (flat)":
return (low + high) / 2
for name, low, high in zip(bl.fm.surfaceToroid, bl.fm.limOptXToroid[1], bl.fm.limOptXToroid[0]):
if fm_stripe == name + " (toroid)":
return -(low + high) / 2
return 0
def mo1_energy_resolution(xtal, energy):
index = bl.mo1.xtal.index(xtal)
crystal = bl.mo1.material1[index]
dtheta = np.linspace(-30, 90, 601)
theta = crystal.get_Bragg_angle(energy) + dtheta * 1e-6
refl = np.abs(crystal.get_amplitude(energy, np.sin(theta))[0])**2 # single crystal
refl = np.abs(crystal.get_amplitude(energy, np.sin(theta))[0]) ** 2 # single crystal
refl2 = refl**2 # DCM with parallel crystals
# FWHM of the DCM curve
spline = UnivariateSpline(dtheta, refl2 - refl2.max()/2, s=0)
spline = UnivariateSpline(dtheta, refl2 - refl2.max() / 2, s=0)
r1, r2 = spline.roots()
fwhm_rad = (r2 - r1) * 1e-6 # µrad → rad
@@ -61,85 +83,88 @@ def mo1_energy_resolution(xtal, energy):
return dE
def cm_reflectivity(cm_stripe, cm_pitch, energy):
index = bl.cm.surface.index(cm_stripe)
rs, rp = bl.cm.material[index].get_amplitude(
energy,
np.sin(cm_pitch)
)[0:2]
refl = abs(rs)**2
rs, rp = bl.cm.material[index].get_amplitude(energy, np.sin(cm_pitch))[0:2]
refl = abs(rs) ** 2
return refl
def fm_reflectivity(fm_stripe, fm_pitch, energy):
if fm_stripe in ('Rh (toroid)', 'Pt (toroid)'):
if fm_stripe in ("Rh (toroid)", "Pt (toroid)"):
surface = bl.fm.surfaceToroid
material = bl.fm.materialToroid
stripe = re.sub(r'\s*\(.*?\)', '', fm_stripe).strip()
stripe = re.sub(r"\s*\(.*?\)", "", fm_stripe).strip()
index = surface.index(stripe)
else:
surface = bl.fm.surfaceFlat
material = bl.fm.materialFlat
stripe = re.sub(r'\s*\(.*?\)', '', fm_stripe).strip()
stripe = re.sub(r"\s*\(.*?\)", "", fm_stripe).strip()
index = surface.index(stripe)
rs, rp = material[index].get_amplitude(
energy,
np.sin(fm_pitch)
)[0:2]
refl = abs(rs)**2
rs, rp = material[index].get_amplitude(energy, np.sin(fm_pitch))[0:2]
refl = abs(rs) ** 2
return refl
def mo1_bragg_angle(mo_mode, d_spacing, energy, cm_pitch):
H = 6.62606957E-34
E = 1.602176634E-19
H = 6.62606957e-34
E = 1.602176634e-19
C = 299792458
wl = C * H / (E * energy)
val = wl / (2 * d_spacing * 1e-10)
bragg_angle = 0
if val > -1 and val < 1:
bragg_angle = np.asin(val)
if mo_mode in 'Monochromatic':
if mo_mode in "Monochromatic":
# Add 2x CM pitch to the bragg angle
bragg_angle_cor = ((2 * cm_pitch) + bragg_angle)
elif mo_mode in 'Pinkbeam':
bragg_angle_cor = (2 * cm_pitch) + bragg_angle
elif mo_mode in "Pinkbeam":
# Align xtal surfaces parallel to beam
bragg_angle_cor = (2 * cm_pitch)
bragg_angle_cor = 2 * cm_pitch
return bragg_angle, bragg_angle_cor
def fm_ideal_pitch(fm_focus, fm_stripe, smpl, sldi_hacc=None, sldi_vacc=None, fm_focx=None, fm_focy=None):
p = bl.fm.center[1] # posFM
q = smpl - bl.fm.center[1] # dist posFM to posEX
if fm_focus in 'Defocused':
a = 2 * np.tan(sldi_hacc) * bl.fm.center[1] # Beam width at focusing mirror
b = 2 * np.tan(sldi_vacc) * bl.cm.center[1] # Beam height at focusing mirror (collimated beam)
def fm_ideal_pitch(
fm_focus, fm_stripe, smpl, sldi_hacc=None, sldi_vacc=None, fm_focx=None, fm_focy=None
):
p = bl.fm.center[1] # posFM
q = smpl - bl.fm.center[1] # dist posFM to posEX
if fm_focus in "Defocused":
a = 2 * np.tan(sldi_hacc) * bl.fm.center[1] # Beam width at focusing mirror
b = (
2 * np.tan(sldi_vacc) * bl.cm.center[1]
) # Beam height at focusing mirror (collimated beam)
x = fm_focx
y = fm_focy
qx = q + x * p / a
qy = q + y * p / b
f = (p * qx) / (p + qx) # focal length
else: # Calculate for focused beam on sample in "manual" and "focused" mode
f = (p * qx) / (p + qx) # focal length
else: # Calculate for focused beam on sample in "manual" and "focused" mode
qy = None
f = (p * q) / (p + q) # focal length
f = (p * q) / (p + q) # focal length
pitch = 0
if 'Rh' in fm_stripe:
pitch = np.arcsin(bl.fm.r[0]/(2*f))# ideal pitch for FM
if 'Pt' in fm_stripe:
pitch = np.arcsin(bl.fm.r[1]/(2*f)) # ideal pitch for FM
if "Rh" in fm_stripe:
pitch = np.arcsin(bl.fm.r[0] / (2 * f)) # ideal pitch for FM
if "Pt" in fm_stripe:
pitch = np.arcsin(bl.fm.r[1] / (2 * f)) # ideal pitch for FM
return pitch, qy
def cm_critical_angle(cm_stripe, energy):
if cm_stripe in 'Si':
if cm_stripe in "Si":
stripe = bl.stripeSi
elif cm_stripe in 'Pt':
elif cm_stripe in "Pt":
stripe = bl.stripePt
elif cm_stripe in 'Rh':
elif cm_stripe in "Rh":
stripe = bl.stripeRh
else:
raise Exception(f'Stripe {stripe} not found in beamline parameters!')
w = CHeVcm/100/energy # convert energy [eV] to wavelength [m]
raise Exception(f"Stripe {stripe} not found in beamline parameters!")
w = CHeVcm / 100 / energy # convert energy [eV] to wavelength [m]
# Calculate critical angle for mirror
f1 = stripe.elements[0].Z + np.real(stripe.elements[0].get_f1f2(energy))
numberDensity = stripe.rho*1e3*AVOGADRO/(stripe.elements[0].mass/1e3)
criticalAngle = np.sqrt(numberDensity*2.8179e-15*w**2*f1/np.pi)
numberDensity = stripe.rho * 1e3 * AVOGADRO / (stripe.elements[0].mass / 1e3)
criticalAngle = np.sqrt(numberDensity * 2.8179e-15 * w**2 * f1 / np.pi)
return criticalAngle
@@ -148,23 +173,24 @@ def mirror_surface_geometries(mirror):
surface = bl.cm.surface
limOptX = bl.cm.limOptX
limOptY = bl.cm.limOptY
elif mirror in 'fm_toroid':
elif mirror in "fm_toroid":
surface = bl.fm.surfaceToroid
limOptX = bl.fm.limOptXToroid
limOptY = bl.fm.limOptYToroid
elif mirror in 'fm_flat':
elif mirror in "fm_flat":
surface = bl.fm.surfaceFlat
limOptX = bl.fm.limOptXFlat
limOptY = bl.fm.limOptYFlat
else:
raise ValueError(f'Requested mirror {mirror} not available!')
raise ValueError(f"Requested mirror {mirror} not available!")
geom = {}
for sf, lx, hx, ly, hy in zip(surface, limOptX[0], limOptX[1], limOptY[0], limOptY[1]):
geom[sf] = (lx, ly, hx-lx, hy-ly)
geom[sf] = (lx, ly, hx - lx, hy - ly)
return geom
def mo_surface_geometries(mo, plane):
if mo in 'mo1':
if mo in "mo1":
xtal = bl.mo1.xtal
xtal_width = bl.mo1.xtalWidth
xtal_offset_x = bl.mo1.xtalOffsetX
@@ -173,34 +199,42 @@ def mo_surface_geometries(mo, plane):
else:
xtal_length = bl.mo1.xtalLength2
else:
raise ValueError(f'Requested mono {mo} not available!')
raise ValueError(f"Requested mono {mo} not available!")
geom = {}
for sf, w, offx, length in zip(xtal, xtal_width, xtal_offset_x, xtal_length):
geom[sf] = (offx-w/2, -length/2, w, length)
geom[sf] = (offx - w / 2, -length / 2, w, length)
return geom
def wall_geometries():
geom = []
for i, _ in enumerate(bl.walls.start):
geom.append([
bl.walls.start[i],
bl.walls.height[i][0],
bl.walls.end[i] - bl.walls.start[i],
bl.walls.height[i][1] - bl.walls.height[i][0],
])
geom.append(
[
bl.walls.start[i],
bl.walls.height[i][0],
bl.walls.end[i] - bl.walls.start[i],
bl.walls.height[i][1] - bl.walls.height[i][0],
]
)
return geom
def pipe_geometries():
pipes = []
for i, _ in enumerate(bl.vacuum_pipes.center):
top = bl.vacuum_pipes.center[i] + bl.vacuum_pipes.diameter[i]/2 + bl.sourceHeight
bottom = bl.vacuum_pipes.center[i] - bl.vacuum_pipes.diameter[i]/2 + bl.sourceHeight
pipes.append({
'x': np.array([bl.vacuum_pipes.start[i], bl.vacuum_pipes.end[i]]),
'y': np.array([top, top])
})
pipes.append({
'x': np.array([bl.vacuum_pipes.start[i], bl.vacuum_pipes.end[i]]),
'y': np.array([bottom, bottom])
})
top = bl.vacuum_pipes.center[i] + bl.vacuum_pipes.diameter[i] / 2 + bl.sourceHeight
bottom = bl.vacuum_pipes.center[i] - bl.vacuum_pipes.diameter[i] / 2 + bl.sourceHeight
pipes.append(
{
"x": np.array([bl.vacuum_pipes.start[i], bl.vacuum_pipes.end[i]]),
"y": np.array([top, top]),
}
)
pipes.append(
{
"x": np.array([bl.vacuum_pipes.start[i], bl.vacuum_pipes.end[i]]),
"y": np.array([bottom, bottom]),
}
)
return pipes
@@ -4,18 +4,19 @@ Digital Twin: Custom BEC widget to support the beamline alignment.
import sys
from pathlib import Path
from typing import Literal
import numpy as np
import yaml
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.error_popups import SafeSlot
# pylint: disable=E0611
from qtpy.QtCore import Qt, QTimer
# pylint: disable=E0611
from qtpy.QtWidgets import (
QApplication,
QDialog,
@@ -33,9 +34,11 @@ from debye_bec.bec_widgets.widgets.digital_twin.calc_surfaces import calc_surfac
from debye_bec.bec_widgets.widgets.digital_twin.calc_varia import (
cm_critical_angle,
cm_reflectivity,
cm_stripe_to_trx,
cm_trx_to_stripe,
fm_ideal_pitch,
fm_reflectivity,
fm_stripe_to_trx,
fm_trx_to_stripe,
mo1_bragg_angle,
mo1_energy_resolution,
@@ -74,21 +77,21 @@ class DigitalTwin(BECWidget, QWidget):
self.input_layout = QVBoxLayout(self.input_widget)
self.input = InputPanel()
self.settings = SettingsPanel()
self.input_layout.addWidget(self.input) # type: ignore
self.input_layout.addWidget(self.settings) # type: ignore
self.input_layout.addWidget(self.input)
self.input_layout.addWidget(self.settings)
self.plot_widget = QWidget()
self.plot_layout = QVBoxLayout(self.plot_widget)
self.sideview_plot = SideviewPlot()
self.surface_plots = SurfacePlots()
self.plot_layout.addWidget(self.sideview_plot) # type: ignore
self.plot_layout.addWidget(self.surface_plots) # type: ignore
self.plot_layout.addWidget(self.sideview_plot)
self.plot_layout.addWidget(self.surface_plots)
self.mover = MoverPanel(self.dev)
self.root_layout.addWidget(self.input_widget, alignment=Qt.AlignTop) # type: ignore
self.root_layout.addWidget(self.plot_widget, alignment=Qt.AlignTop) # type: ignore
self.root_layout.addWidget(self.mover, alignment=Qt.AlignTop)
self.root_layout.addWidget(self.input_widget, alignment=Qt.AlignmentFlag.AlignTop)
self.root_layout.addWidget(self.plot_widget, alignment=Qt.AlignmentFlag.AlignTop)
self.root_layout.addWidget(self.mover, alignment=Qt.AlignmentFlag.AlignTop)
self.setLayout(self.root_layout)
self.setWindowTitle("Digital Twin")
@@ -126,7 +129,13 @@ class DigitalTwin(BECWidget, QWidget):
self._timer.timeout.connect(self.calc_reality)
self._timer.start()
def apply_theme(self, theme):
def apply_theme(self, theme: Literal["dark", "light"]):
"""
Apply the theme
Args:
theme (str): Theme, either "dark" or "light"
"""
self.sideview_plot.apply_theme(theme)
self.surface_plots.apply_theme(theme)
self.mover.apply_theme(theme)
@@ -176,10 +185,12 @@ class DigitalTwin(BECWidget, QWidget):
top = QHBoxLayout()
icon = QLabel()
icon_pixmap = (
QApplication.style().standardIcon(QStyle.SP_MessageBoxWarning).pixmap(48, 48)
QApplication.style()
.standardIcon(QStyle.StandardPixmap.SP_MessageBoxWarning)
.pixmap(48, 48)
)
icon.setPixmap(icon_pixmap)
icon.setAlignment(Qt.AlignTop)
icon.setAlignment(Qt.AlignmentFlag.AlignTop)
top.addWidget(icon)
text = QLabel(
@@ -187,13 +198,13 @@ class DigitalTwin(BECWidget, QWidget):
+ "Reload the config with the correct devices."
)
text.setWordWrap(True)
text.setAlignment(Qt.AlignTop)
text.setAlignment(Qt.AlignmentFlag.AlignTop)
top.addWidget(text, stretch=1)
layout.addLayout(top)
info = QLabel("Missing devices:\n" + ", ".join(missing))
info.setWordWrap(True)
info.setAlignment(Qt.AlignTop)
info.setAlignment(Qt.AlignmentFlag.AlignTop)
layout.addWidget(info)
layout.addStretch()
@@ -209,9 +220,10 @@ class DigitalTwin(BECWidget, QWidget):
dialog.setLayout(layout)
dialog.show()
info.setMinimumHeight(info.heightForWidth(info.width()))
if dialog.exec_() == QDialog.Rejected:
QApplication.instance().exit(0)
# sys.exit(0)
if dialog.exec_() == QDialog.DialogCode.Rejected:
app = QApplication.instance()
if app is not None:
app.exit(0)
if reload:
self._timer.start()
@@ -255,6 +267,9 @@ class DigitalTwin(BECWidget, QWidget):
self.calc_fm_ideal_pitch()
case "fm_focy":
self.calc_fm_ideal_pitch()
case "fm_rotx":
self.calc_fm_reflectivity()
self.calc_cm_fm_harm_suppr()
case "fm_stripe":
self.calc_fm_reflectivity()
self.calc_cm_fm_harm_suppr()
@@ -265,7 +280,7 @@ class DigitalTwin(BECWidget, QWidget):
self.calc_assistant_sideview()
self.calc_assistant_surfaces()
def get_assistant_config(self):
def get_assistant_config(self, apply_offset: bool = False):
fm_focus = self.input.fm_focus.currentText()
if fm_focus in "Manual":
fm_rotx = self.input.fm_rotx.value()
@@ -277,23 +292,54 @@ class DigitalTwin(BECWidget, QWidget):
fm_rotx = self.input.fm_rotx_ideal.value()
fm_qy = self.qy
config = { # Config in SI units!
cm_stripe = self.input.cm_stripe.currentText()
cm_trx = cm_stripe_to_trx(cm_stripe)
fm_stripe = self.input.fm_stripe.currentText()
fm_trx = fm_stripe_to_trx(fm_stripe)
config = {
"energy": self.input.energy.value(),
"h_acc": self.input.sldi_hacc.value() * 1e-3,
"v_acc": self.input.sldi_vacc.value() * 1e-3,
"cm_pitch": -self.input.cm_pitch.value() * 1e-3,
"cm_stripe": self.input.cm_stripe.currentText(),
"cm_trx": None,
"h_acc": self.input.sldi_hacc.value(),
"v_acc": self.input.sldi_vacc.value(),
"cm_pitch": -self.input.cm_pitch.value(),
"cm_stripe": cm_stripe,
"cm_trx": cm_trx,
"mo1_mode": self.input.mo1_mode.currentText(),
"mo1_xtal": self.input.mo1_xtal.currentText(),
"mo1_bragg": self.bragg_angle,
"fm_rotx": -fm_rotx * 1e-3,
"fm_stripe": self.input.fm_stripe.currentText(),
"fm_trx": None,
"fm_rotx": -fm_rotx,
"fm_stripe": fm_stripe,
"fm_trx": fm_trx,
"fm_qy": fm_qy,
"fm_gain_height": 1,
"smpl": self.input.smpl.value(),
}
# Apply offsets
if apply_offset:
for axis, _ in config.items():
if axis in self.offsets:
axis_offsets = self.offsets[axis]
logger.info(f"Axis: {axis}")
if "modifier" in axis_offsets and "offset" in axis_offsets:
for idx, rng in enumerate(axis_offsets["modifier"]["range"]):
logger.info(f"rng: {rng}")
logger.info(f'value: {config[axis_offsets["modifier"]["axis"]]}')
if rng[0] < config[axis_offsets["modifier"]["axis"]] < rng[1]:
logger.info(f'offset: {axis_offsets["offset"][idx]}')
# logger.info(f"axis_data before: {axis_data}")
config[axis] += axis_offsets["offset"][idx]
# logger.info(f"axis_data after: {axis_data}")
break
elif "offset" in axis_offsets:
config[axis] += axis_offsets["offset"]
# Convert to SI units!
config["h_acc"] *= 1e-3
config["v_acc"] *= 1e-3
config["cm_pitch"] *= 1e-3
config["fm_rotx"] *= 1e-3
# logger.info(f'Config created: {config}')
return config
@@ -321,13 +367,13 @@ class DigitalTwin(BECWidget, QWidget):
"v_acc": v_acc,
"cm_pitch": -cm_pitch * 1e-3,
"cm_stripe": cm_stripe,
"cm_trx": -cm_trx,
"cm_trx": cm_trx,
"mo1_mode": mo1_mode,
"mo1_xtal": mo1_bragg["mo1_bragg_crystal_current_xtal_string"]["value"],
"mo1_bragg": mo1_bragg["mo1_bragg_angle"]["value"] / 180 * np.pi,
"fm_rotx": -fm_rotx_real * 1e-3,
"fm_stripe": fm_stripe,
"fm_trx": -fm_trx,
"fm_trx": fm_trx,
"fm_gain_height": 1,
"smpl": smpl,
}
@@ -396,7 +442,7 @@ class DigitalTwin(BECWidget, QWidget):
pos["ot_es1_trz"] = self.dev.ot_es1_trz.read(cached=True)["ot_es1_trz"]["value"]
# Removing offsets
for axis, value in pos.items():
for axis, _ in pos.items():
if axis in self.offsets:
axis_offsets = self.offsets[axis]
if "modifier" in axis_offsets and "offset" in axis_offsets:
@@ -471,9 +517,8 @@ class DigitalTwin(BECWidget, QWidget):
def calc_reality(self):
config = self.get_reality_config()
beam = calc_sideview(config)
data = {"x": beam["x"], "y": beam["y"]}
self.sideview_plot.update_curves("reality", data)
data = calc_sideview(config)
self.sideview_plot.update_curves("reality", data=data)
# logger.info('Calc reality surfaces')
surfaces = calc_surfaces(config)
self.surface_plots.update_surfaces(scene="reality", data=surfaces)
@@ -517,8 +562,8 @@ class DigitalTwin(BECWidget, QWidget):
)
def calc_assistant_sideview(self):
beam = calc_sideview(self.get_assistant_config())
data = {"x": beam["x"], "y": beam["y"]}
config = self.get_assistant_config(apply_offset=True)
data = calc_sideview(config)
self.sideview_plot.update_curves("assistant", data)
def calc_assistant_surfaces(self):
@@ -581,11 +626,11 @@ class DigitalTwin(BECWidget, QWidget):
"mo1_bragg_crystal_d_spacing_si311"
]["value"]
else:
raise Exception(f"Invalid xtal selection: {xtal}")
raise ValueError(f"Invalid xtal selection: {xtal}")
cm_pitch = -self.dev.cm_rotx.read(cached=True)["cm_rotx"]["value"] * 1e-3
mo1_mode = self.input.mo1_mode.currentText()
energy = self.input.energy.value()
theta, theta_cor = mo1_bragg_angle(mo1_mode, d_spacing, energy, cm_pitch)
theta, _ = mo1_bragg_angle(mo1_mode, d_spacing, energy, cm_pitch)
self.bragg_angle = theta
self.input.mo1_bragg_angle.setValue(theta / np.pi * 180)
@@ -599,7 +644,7 @@ class DigitalTwin(BECWidget, QWidget):
self.input.mo1_bragg_angle.setVisible(False)
self.input.mo1_eres.setVisible(False)
def calc_fm_ideal_pitch(self): # TODO: What happens if the flats are selected?
def calc_fm_ideal_pitch(self):
fm_focus = self.input.fm_focus.currentText()
fm_stripe = self.input.fm_stripe.currentText()
smpl = self.input.smpl.value()
@@ -620,9 +665,6 @@ class DigitalTwin(BECWidget, QWidget):
if __name__ == "__main__":
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.colors import apply_theme
app = QApplication(sys.argv)
apply_theme("light")
dispatcher = BECDispatcher(gui_id="digital_twin")
@@ -15,7 +15,7 @@ from debye_bec.bec_widgets.widgets.qt_widgets import (
class InputPanel(QWidget):
"""Right-side control panel: input field, indicator, send, recording."""
"""Panel for user inputs of the digital twin widget"""
def __init__(self, parent=None):
super().__init__(parent)
@@ -1,25 +1,19 @@
import random
"""Move widget to display an axis and also move it through BEC"""
import threading
import time
from typing import Literal, Optional
from bec_lib import bec_logger
# import qtawesome as qta
from bec_qthemes import material_icon
from bec_widgets.utils.colors import get_accent_colors
from qtpy.QtCore import Property, QObject, QPropertyAnimation, Qt, QThread, Signal
# pylint: disable=E0611
from qtpy.QtCore import Property # type: ignore[attr-defined]
from qtpy.QtCore import Signal # type: ignore[attr-defined]
from qtpy.QtCore import QObject, QPropertyAnimation, Qt, QThread
from qtpy.QtGui import QTransform
from qtpy.QtWidgets import (
QApplication,
QDoubleSpinBox,
QFrame,
QGroupBox,
QHBoxLayout,
QLabel,
QPushButton,
QVBoxLayout,
QWidget,
)
from qtpy.QtWidgets import QApplication, QHBoxLayout, QLabel, QPushButton, QWidget
from debye_bec.devices.absorber import STATUS as ABS_STATUS
@@ -27,6 +21,8 @@ logger = bec_logger.logger
class Status:
"""Status class for the axis"""
IN_POSITION = "in_position" # green mdi.check-circle
NOT_IN_POSITION = "not_in_position" # orange mdi.close-circle
MOVING = "moving" # blue mdi.loading (spinning)
@@ -55,10 +51,10 @@ class StatusIcon(QWidget):
self._label = QLabel(self)
self._label.setFixedSize(self.ICON_SIZE, self.ICON_SIZE)
self._label.setAlignment(Qt.AlignCenter)
self._label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.setFixedSize(self.ICON_SIZE, self.ICON_SIZE)
self._spin_anim = QPropertyAnimation(self, b"rotation")
self._spin_anim = QPropertyAnimation(self, b"rotation") # type: ignore[call-arg]
self._spin_anim.setStartValue(0)
self._spin_anim.setEndValue(360)
self._spin_anim.setDuration(1000)
@@ -67,19 +63,42 @@ class StatusIcon(QWidget):
self.set_status(Status.NOT_IN_POSITION)
def get_rotation(self):
"""Return the current rotation angle in degrees."""
return self._rotation
def set_rotation(self, angle):
def set_rotation(self, angle: float):
"""
Set the rotation angle and update the displayed pixmap.
Rotates the current base pixmap around its center point using a smooth
transformation. Has no effect on the display if no base pixmap is set.
Args:
angle (float): Rotation angle in degrees, clockwise.
"""
self._rotation = angle
if self._current_pixmap_base is not None:
cx = self._current_pixmap_base.width() / 2
cy = self._current_pixmap_base.height() / 2
t = QTransform().translate(cx, cy).rotate(angle).translate(-cx, -cy)
self._label.setPixmap(self._current_pixmap_base.transformed(t, Qt.SmoothTransformation))
self._label.setPixmap(
self._current_pixmap_base.transformed(t, Qt.TransformationMode.SmoothTransformation)
)
rotation = Property(float, get_rotation, set_rotation)
rotation = Property(float, get_rotation, set_rotation) # type: ignore[call-arg]
def set_status(self, status: str):
"""
Update the widget's status and refresh the displayed icon accordingly.
Looks up the icon name and color associated with the given status from
``_ICON_MAP``, renders a new pixmap, and starts or stops the spin
animation depending on whether the status is ``Status.MOVING``. Returns
early without any updates if the status has not changed.
Args:
status (str): The new status value. Must be a key in ``_ICON_MAP``.
"""
if status == self._status:
return
self._status = status
@@ -115,9 +134,11 @@ class MotionWorker(QObject):
self._stop_flag = threading.Event()
def stop(self):
"""Sets the stop flag"""
self._stop_flag.set()
def run(self):
"""Prepares the movement based on the axis (motor)"""
match self.motor:
case "sldi_gapx" | "sldi_gapy" | "sldi_centerx" | "sldi_centery":
self.motion()
@@ -214,7 +235,7 @@ class MotionWorker(QObject):
case _:
logger.warning(f"Motor {self.motor} not integrated in digital twin!")
def motion(self, abs_closed=False, relative=False, rb=None, surveyed_axes=None):
def motion(self, abs_closed: bool = False, relative: bool = False, rb=None, surveyed_axes=None):
"""
Moves an axis while surverying a set of axes (if set).
Example surveyed_axes:
@@ -226,7 +247,8 @@ class MotionWorker(QObject):
if abs_closed:
if self.dev.abs.status.get() == ABS_STATUS.OPEN:
status = self.dev.abs.close()
# TODO Set timeout to 0.001 and check if it actually raises (it should not start motion).
# TODO Set timeout to 0.001 and check if it actually raises
# (it should not start motion).
# Check of behavior of digital twin afterwards.
status.wait(timeout=5)
if surveyed_axes is not None:
@@ -259,7 +281,7 @@ class MotionWorker(QObject):
self.dev[self.motor].stop()
self.error.emit(1)
break
self.finished.emit(True)
self.finished.emit()
class MoveWidget(QWidget):
@@ -326,7 +348,13 @@ class MoveWidget(QWidget):
self.apply_theme()
def apply_theme(self, theme=None):
def apply_theme(self, theme: Optional[Literal["dark", "light"]] = None):
"""
Apply the theme
Args:
theme (Optional[str]): Theme, either "dark", "light", or None. Defaults to None.
"""
if theme is None:
app = QApplication.instance()
theme = app.theme.theme # type: ignore
@@ -342,14 +370,17 @@ class MoveWidget(QWidget):
if self.btn_mode == "start":
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().success.name()}; color: white;}}"
"QPushButton "
+ f"{{background-color: {get_accent_colors().success.name()}; color: white;}}"
)
else:
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().emergency.name()}; color: white;}}"
"QPushButton "
+ f"{{background-color: {get_accent_colors().emergency.name()}; color: white;}}"
)
def set_target(self, target):
"""Change the target value in the ui"""
self.target = target
text = f"{target:.{int(self.decimals)}f}"
if self.unit is not None:
@@ -358,6 +389,7 @@ class MoveWidget(QWidget):
self._on_target_or_fb_changed()
def set_feedback(self, fb):
"""Change the feedback value in the ui"""
if self.status != Status.MOVING:
self.fb = fb
text = f"{fb:.{int(self.decimals)}f}"
@@ -367,19 +399,23 @@ class MoveWidget(QWidget):
self._on_target_or_fb_changed()
def _apply_button_style(self, mode: str):
"""Apply a button style depending on if the button shows start or stop"""
self.btn_mode = mode
if mode == "start":
self.btn_action.setText("Move")
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().success.name()}; color: white;}}"
"QPushButton "
+ f"{{background-color: {get_accent_colors().success.name()}; color: white;}}"
)
else: # stop
self.btn_action.setText("Stop")
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().emergency.name()}; color: white;}}"
"QPushButton "
+ f"{{background-color: {get_accent_colors().emergency.name()}; color: white;}}"
)
def _set_status(self, status: str):
"""Set the current status icon in the ui"""
self.status = status
self.status_icon.set_status(status)
@@ -393,12 +429,14 @@ class MoveWidget(QWidget):
self._set_status(Status.NOT_IN_POSITION)
def _on_button_clicked(self):
"""Starts or stops motion depending on current situation"""
if self._thread and self._thread.isRunning():
self._stop_motion()
else:
self._start_motion()
def _start_motion(self):
"""Start a motion"""
target = self.target
if abs(target - self.fb) <= self.deadband:
self._set_status(Status.IN_POSITION)
@@ -422,21 +460,25 @@ class MoveWidget(QWidget):
self._thread.start()
def _on_error(self):
"""Called when an error occurs"""
self._set_status(Status.ERROR)
self._apply_button_style("start")
def _stop_motion(self):
"""Attempts to stop the motion"""
if self._worker:
self._worker.stop()
def _on_position_changed(self, pos: float):
"""Change the feedback value in the ui"""
self.fb = pos
text = f"{pos:.{int(self.decimals)}f}"
if self.unit is not None:
text = text + " " + self.unit
self.fb_label.setText(text)
def _on_motion_finished(self, reached: bool):
def _on_motion_finished(self):
"""Finished a movement"""
target = self.target
if self.status not in Status.ERROR:
if abs(self.fb - target) <= self.deadband:
@@ -446,6 +488,7 @@ class MoveWidget(QWidget):
self._apply_button_style("start")
def _cleanup_thread(self):
"""Cleaning up of the mover thread"""
if self._thread:
self._thread.deleteLater()
self._thread = None
@@ -454,6 +497,7 @@ class MoveWidget(QWidget):
self._worker = None
def shutdown(self):
"""Cleaning up of the mover when shutting down the application"""
if self._worker:
self._worker.stop()
if self._thread:
@@ -507,6 +551,12 @@ class AbsorberWidget(QWidget):
layout.addWidget(self.btn_action)
def set_feedback(self, fb: bool):
"""
Displays the status of the absober in the ui
Args:
fb (bool): True will set the button to Open, False to Closed
"""
self.fb = fb
if fb:
self.fb_label.setText("Open")
@@ -516,9 +566,16 @@ class AbsorberWidget(QWidget):
self.fb_label.setStyleSheet(f"QLabel {{color: {get_accent_colors().emergency.name()}}}")
def enable_open(self, enable: bool = False):
"""
Enable or disable the open/close button
Args:
enable (bool): Enables and disables the button
"""
if enable:
self.btn_action.setStyleSheet(
f"QPushButton {{background-color: {get_accent_colors().success.name()}; color: white;}}"
"QPushButton "
+ f"{{background-color: {get_accent_colors().success.name()}; color: white;}}"
)
self.btn_action.setEnabled(True)
else: # disabled
@@ -528,4 +585,5 @@ class AbsorberWidget(QWidget):
self.btn_action.setDisabled(True)
def _on_button_clicked(self):
"""Open absorber"""
self.absorber.open()
@@ -2,6 +2,8 @@
Panel to move an axis to a certain position
"""
from typing import Literal
# pylint: disable=E0611
from qtpy.QtWidgets import QLayout, QVBoxLayout, QWidget
@@ -10,6 +12,7 @@ from debye_bec.bec_widgets.widgets.qt_widgets import Group
class MoverPanel(QWidget):
""" "Panel to move an axis to a certain position"""
def __init__(self, dev, parent=None):
super().__init__(parent)
@@ -215,6 +218,12 @@ class MoverPanel(QWidget):
self._layout.addWidget(self.mover_group)
self._layout.addStretch()
def apply_theme(self, theme):
def apply_theme(self, theme: Literal["dark", "light"]):
"""
Apply the theme
Args:
theme (str): Theme, either "dark" or "light"
"""
for widget in self.mover_widgets:
widget.apply_theme(theme)
@@ -2,6 +2,8 @@
Two plot classes to plot side-view and surface-view
"""
from typing import Literal, Optional, TypedDict, cast
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger
@@ -19,6 +21,7 @@ from debye_bec.bec_widgets.widgets.digital_twin.calc_varia import (
pipe_geometries,
wall_geometries,
)
from debye_bec.bec_widgets.widgets.digital_twin.types import DataDict, SurfaceDict
from debye_bec.bec_widgets.widgets.qt_widgets import Group
logger = bec_logger.logger
@@ -31,7 +34,7 @@ class SurfacePlots(QWidget):
super().__init__(parent=parent)
self._layout = QHBoxLayout(self)
self.surfaces = {
self.surfaces: dict[str, SurfaceDict] = {
"assistant": {
"cm": {"x": [], "y": []},
"mo1_1": {"x": [], "y": []},
@@ -72,8 +75,10 @@ class SurfacePlots(QWidget):
for idx, scene in enumerate(self.surfaces):
for name, _ in self.surfaces[scene].items():
if scene in "assistant":
brush = QBrush(QColor(*self.colors[idx], 255), Qt.DiagCrossPattern)
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=1, style=Qt.DashLine)
brush = QBrush(QColor(*self.colors[idx], 255), Qt.BrushStyle.DiagCrossPattern)
pen = pg.mkPen(
QColor(*self.colors[idx], 255), width=1, style=Qt.PenStyle.DashLine
)
z_value = 2
else:
brush = QBrush(QColor(*self.colors[idx], 255))
@@ -92,8 +97,13 @@ class SurfacePlots(QWidget):
self.apply_theme()
def apply_theme(self, theme=None):
def apply_theme(self, theme: Optional[Literal["dark", "light"]] = None):
"""
Apply the theme
Args:
theme (Optional[str]): Theme, either "dark", "light", or None. Defaults to None.
"""
if theme is None:
app = QApplication.instance()
theme = app.theme.theme # type: ignore
@@ -121,8 +131,10 @@ class SurfacePlots(QWidget):
for idx, scene in enumerate(self.surfaces):
for name, _ in self.surfaces[scene].items():
if scene in "assistant":
brush = QBrush(QColor(*self.colors[idx], 255), Qt.DiagCrossPattern)
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=1, style=Qt.DashLine)
brush = QBrush(QColor(*self.colors[idx], 255), Qt.BrushStyle.DiagCrossPattern)
pen = pg.mkPen(
QColor(*self.colors[idx], 255), width=1, style=Qt.PenStyle.DashLine
)
else:
brush = QBrush(QColor(*self.colors[idx], 255))
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=0)
@@ -131,21 +143,18 @@ class SurfacePlots(QWidget):
for wall in self.walls:
wall.setPen(pg.mkPen(color=self.color_impenetrable, width=2))
wall.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
wall.setBrush(QBrush(QColor(*self.color_impenetrable)))
for text in self.texts:
text.setColor(self.text_color)
def plot_walls(self):
"""Plot walls"""
def plot_surface(widget, surfaces):
for name, surface in surfaces.items():
rect = pg.QtWidgets.QGraphicsRectItem(*surface) # pylint: disable=E1101
rect.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
rect.setBrush(QBrush(QColor(*self.color_impenetrable))) # pylint: disable=E1101
rect.setPen(pg.mkPen(color=self.color_impenetrable, width=2))
widget.addItem(rect)
text = pg.TextItem(name, color=self.text_color, anchor=(0.5, 0.5))
@@ -166,13 +175,21 @@ class SurfacePlots(QWidget):
plot_surface(plot["widget"], mirror_surface_geometries("fm_flat"))
plot_surface(plot["widget"], mirror_surface_geometries("fm_toroid"))
else:
raise Exception(f"Plot {name} not found!")
raise ValueError(f"Plot {name} not found!")
for name, plot in self.plots.items():
plot["widget"].disableAutoRange()
def update_surfaces(self, scene, data):
def update_surfaces(self, scene: Literal["assistant", "reality"], data: SurfaceDict):
"""Update the curves of the plot
Args:
scene (str): The scene to update, either "assistant" or "reality".
data (DataDict): The new data to plot, with keys "x" and "y",
each containing a list of values.
"""
self.surfaces[scene] = data
for name, device in self.surfaces[scene].items():
device = cast(DataDict, device)
plot = self.plots[name][scene]
x = np.array(device["x"] + [device["x"][0]]) if len(device["x"]) != 0 else np.array([])
y = np.array(device["y"] + [device["y"][0]]) if len(device["y"]) != 0 else np.array([])
@@ -195,7 +212,7 @@ class SideviewPlot(QWidget):
self.color_impenetrable = (0, 0, 0)
self.colors = [(255, 255, 0), (255, 0, 255)]
self.data = {
self.data: dict[str, DataDict] = {
"assistant": {"x": [0, 1000, 2000], "y": [0, 20, 30]},
"reality": {"x": [0, 1000, 2000], "y": [0, 15, 50]},
}
@@ -207,7 +224,7 @@ class SideviewPlot(QWidget):
for idx, scene in enumerate(self.data.keys()):
if scene in "assistant":
pen = pg.mkPen(color=self.colors[idx], width=2, style=Qt.DotLine)
pen = pg.mkPen(color=self.colors[idx], width=2, style=Qt.PenStyle.DotLine)
z_value = 2
else:
pen = pg.mkPen(color=self.colors[idx], width=2)
@@ -220,8 +237,8 @@ class SideviewPlot(QWidget):
self.plot_widget.setLabel("left", "Height [mm]")
self.plot_widget.setLabel("bottom", "Distance [mm]")
self.plot_widget.setMouseEnabled(x=False, y=False)
self.plot_widget.setXRange(0, 25000, padding=0.1)
self.plot_widget.setYRange(-20, 120, padding=0.1)
self.plot_widget.setXRange(0, 25000, 0.1) # pylint: disable=E1121 # type: ignore
self.plot_widget.setYRange(-20, 120, 0.1) # pylint: disable=E1121 # type: ignore
self.plot_widget.setMenuEnabled(False)
self.plot_widget.hideButtons()
@@ -233,8 +250,13 @@ class SideviewPlot(QWidget):
self.apply_theme()
def apply_theme(self, theme=None):
def apply_theme(self, theme: Optional[Literal["dark", "light"]] = None):
"""
Apply the theme
Args:
theme (Optional[str]): Theme, either "dark", "light", or None. Defaults to None.
"""
if theme is None:
app = QApplication.instance()
theme = app.theme.theme # type: ignore
@@ -260,8 +282,8 @@ class SideviewPlot(QWidget):
for idx, scene in enumerate(self.data):
if scene in "assistant":
brush = QBrush(QColor(*self.colors[idx], 255), Qt.DiagCrossPattern)
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=3, style=Qt.DotLine)
brush = QBrush(QColor(*self.colors[idx], 255), Qt.BrushStyle.DiagCrossPattern)
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=3, style=Qt.PenStyle.DashLine)
else:
brush = QBrush(QColor(*self.colors[idx], 255))
pen = pg.mkPen(QColor(*self.colors[idx], 255), width=3)
@@ -270,14 +292,13 @@ class SideviewPlot(QWidget):
for wall in self.walls:
wall.setPen(pg.mkPen(color=self.color_impenetrable, width=3))
wall.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
wall.setBrush(QBrush(QColor(*self.color_impenetrable))) # pylint: disable=E1101
for pipe in self.pipes:
pipe.setPen(pg.mkPen(color=self.color_impenetrable, width=3))
def plot_vacuum_pipes(self):
"""Plot vacuum pipes"""
pipes = pipe_geometries()
for pipe in pipes:
self.pipes.append(
@@ -287,17 +308,23 @@ class SideviewPlot(QWidget):
)
def plot_walls(self):
"""Plot walls"""
walls = wall_geometries()
for wall in walls:
rect = pg.QtWidgets.QGraphicsRectItem(*wall) # pylint: disable=E1101
rect.setBrush(
pg.QtGui.QBrush(pg.QtGui.QColor(*self.color_impenetrable))
) # pylint: disable=E1101
rect.setBrush(QBrush(QColor(*self.color_impenetrable))) # pylint: disable=E1101
rect.setPen(pg.mkPen(color=self.color_impenetrable, width=2))
self.plot_widget.addItem(rect)
self.walls.append(rect)
def update_curves(self, scene, data):
def update_curves(self, scene: Literal["assistant", "reality"], data: DataDict):
"""Update the curves of the plot
Args:
scene (str): The scene to update, either "assistant" or "reality".
data (DataDict): The new data to plot, with keys "x" and "y",
each containing a list of values.
"""
self.data[scene] = data
plot = self.plots[scene]
plot.setData(x=self.data[scene]["x"], y=self.data[scene]["y"])
@@ -9,7 +9,7 @@ from debye_bec.bec_widgets.widgets.qt_widgets import Button, Group
class SettingsPanel(QWidget):
"""Right-side control panel: input field, indicator, send, recording."""
"""Settings panel for the digital twin widget"""
def __init__(self, parent=None):
super().__init__(parent)
@@ -0,0 +1,34 @@
"""Types used for plotting data"""
from typing import TypedDict
class DataDict(TypedDict):
"""
Typed dictionary representing plot data.
Attributes:
x (list[float]): List of x-axis values.
y (list[float]): List of y-axis values.
"""
x: list
y: list
class SurfaceDict(TypedDict):
"""
Typed dictionary representing the surfaces of a scene,
grouping plot data by surface type.
Attributes:
cm (DataDict): Data for the cm surface.
mo1_1 (DataDict): Data for the mo1_1 surface.
mo1_2 (DataDict): Data for the mo1_2 surface.
fm (DataDict): Data for the fm surface.
"""
cm: DataDict
mo1_1: DataDict
mo1_2: DataDict
fm: DataDict