Merge pull request 'added AP and KM macros to main bec from x10sa production branch' (#7) from x10sa_production_20260112T142809 into main
All checks were successful
CI for pxii_bec / test (push) Successful in 31s

Reviewed-on: #7
This commit was merged in pull request #7.
This commit is contained in:
2026-01-26 10:39:15 +01:00
9 changed files with 3421 additions and 0 deletions

1522
pxii_bec/macros/APscripts.py Executable file

File diff suppressed because it is too large Load Diff

320
pxii_bec/macros/Undu_Helpers.py Executable file
View File

@@ -0,0 +1,320 @@
#!/usr/bin/env python
from pylab import *
import sys, os
# import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import scipy.interpolate
from scipy.optimize import curve_fit
from scipy.ndimage import gaussian_filter1d
def fit_harm(harm, n, order):
x = harm[0, :].astype(float)
y = harm[1, :].astype(
float
) ## else funny object that might contain funny strings ...
coeff = np.polyfit(x, y, order) # 3 in general i.e., 4 params
polynomial = np.poly1d(coeff)
x_fit = np.linspace(min(x), max(x), 100)
y_fit = polynomial(x_fit)
print("Polynomial coefficients of the harmonic: ", n, coeff)
plot(x_fit, y_fit, color="blue")
return (
x / n,
y,
) # get the normalized energy gap relation for fitting the Halbach coeff
######### simple exp fit ############
def exponential_func0(x, a, b, c):
# fit 3 params a,b,c
return a * np.exp(b * x + c * x ** 2)
######### inverse exp fit ############
def exponential_func1(x, a, b, c):
# fit 3 params a,b,c
return 1 / (1 + (a * np.exp(b * x + c * x ** 2)))
######### inverse exp fit plus E_max ############
def exponential_func2(x, a, b, c, d):
# fit 4 params a,b,c, e.g., fit energy of storage ring as well
return d / (1 + (a * np.exp(b * x + c * x ** 2)))
##################################
def return_harmon():
# from 3rd to 21th Energy range from 6 keV to 30 keV
# harmonics, July 2025
import numpy as np
nharm = 13
h = [np.array([]) for _ in range(nharm + 1)]
h[0] = np.array([0])
h[1] = np.array([0])
h[2] = np.array([0])
h[3] = np.array([1.07141725, -0.52834258]) # 3rd harmon
h[4] = np.array([0.007111, -0.13678409, 1.52295567, -1.06882785])
h[5] = np.array([0.00315051, -0.0766359, 1.13107469, -0.86442062])
h[6] = np.array([0.00162862, -0.04452336, 0.82948259, -0.37329674])
h[7] = np.array([0.00080764, -0.02418882, 0.59212947, 0.15702886])
h[8] = np.array([1.16699242e-03, -4.68207543e-02, 9.43972396e-01, -1.94201019e00])
h[9] = np.array([4.84194444e-04, -2.01064762e-02, 5.55141139e-01, -3.81800667e-01])
h[10] = np.array([2.11583333e-04, -8.03503571e-03, 3.43141595e-01, 6.02318000e-01])
h[11] = np.array([3.34391667e-03, -1.84053357e-01, 3.58338994e00, -1.93640241e01])
h[12] = np.array([3.20520798e-05, 2.39253145e-03, 8.09198503e-02, 2.22897377e00])
h[13] = np.array([0.00278744, 0.07979874, 2.05143916])
# fitpar_u19 = np.array([ 2.17078531, 0.519452, -0.00720255]) # measured from U19
return h
def plot_harmon(e_start, e_end, h_no, pr_out = False):
enarr= np.arange(e_start, e_end+1, 0.5)
h_all = return_harmon()
h=h_all[h_no]
polynomial = np.poly1d(h)
gaps = polynomial(enarr)
if (pr_out):
print("en =", enarr )
print("gaps =", gaps )
plt.ion()
plt.figure()
plt.plot(enarr,gaps,'*')
plt.title(f"harmonic no {h_no}")
plt.xlabel("E / keV")
plt.ylabel("Gap / mm")
plt.show()
def setu19(en, *harm_no, detune=0):
"""
set the U19 to the gaps defined in Jul2025, or the "theoretical" ones for higher
harmonics
USAGE:
setu19(en, *harm_no, detune=0)
en in keV, possibly select a special harmonics, or detune [0/1] to a value
with a nicer beam shape but less flux
"""
g0 = dev.id_gap.readback.get()
fitpar_u19 = np.array([2.17078531, 0.519452, -0.00720255]) # measured from U19
h_all = return_harmon()
# dep on energy -> select harmonics
if harm_no:
harm = int(harm_no[0])
print("Selected harmonics is:", harm)
else:
print("Using the default optimised harmonics")
if en <= 7:
harm = 3 # h = h[3]
elif 7 < en <= 10:
harm = 5
elif 10 < en <= 13:
harm = 7
elif 13 < en <= 16:
harm = 9
elif 16 < en <= 19:
harm = 11
elif 19 < en <= 22:
harm = 13
if en <= 22:
h = h_all[harm]
print(f"harmonics is {harm}.")
polynomial = np.poly1d(h)
g = polynomial(en)
# print(f"For energy {en} the used harmonic is {polynomial} for gap {g}")
## for time being: if E > 20 : use the "theoretical", old measured and interpolated values up to E = 30 keV
# 21, 22 : 13
# 23, 24, 25: 15;
# 26: 15 or 17
# 27, 28: 17
# 29: 15, 17, 19
# 30: 19
##
if 23 < en <= 25:
n = 15
enorm = en / n
g = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2])
elif 25 < en <= 29:
n = 17
enorm = en / n
g = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2])
elif 29 < en:
n = 19
enorm = en / n
g = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2])
print("Undulator has been ", g0, " mm")
if 4.5 <= g <= 20:
print("Moving Undulator gap to ", g, " mm")
else:
print("not a valid gap, do nothing")
if detune:
g =g *0.996
print("moving to detuned gap value, slightly below max, about 0.15 % ")
#print("move disabled!!")
res = scans.umv(dev.id_gap, g, relative=False)
return
##################################
def harmon_walk(estart=7.5, end_en=13):
import time
en = estart
ans ='y'
while en < end_en+0.5 and ans == 'y':
print(en)
setu19(en, 5)
time.sleep(2)
sete(en)
en = en+0.5
ans = input("Next energy? y/n: ")
##################################
def gap_harm(e=12.4):
fitpar_u19 = np.array([2.17078531, 0.519452, -0.00720255])
fitpar_u17 = np.array([2.17078572, 0.46477267, -0.005766])
e = float(input("Please enter an energy between 6 and 30: "))
for n in range(3, 23):
# n = int(input('Please enter a harmonic n, n between 3 and 15 '))
enorm = e / n
g_19 = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2])
g_17 = exponential_func0(enorm, fitpar_u17[0], fitpar_u17[1], fitpar_u17[2])
print(f"harmonic {n}, gap for U19 = {g_19:.5f}, for U17 = {g_17:.5f}")
# print(f"harmonic {n}, gap for U17 = {g_17}")
return
##################################
def long_gscan(estart=7, end_en=20.5, g_low=4.5, g_high=9.0, nsteps=1500):
import time
import numpy as np
dirname = "/home/gac-x10sa/Data/"
print(
f"scanning the U19 gap from {estart} keV to {end_en} keV, for a gapsize from {g_low} to {g_high}"
)
resol = (g_high - g_low) / nsteps
print(f"nsteps = {nsteps}; resolution is {resol} mm")
dock_area = bec.gui.new("LongGapScan")
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
mot = dev.id_gap
det = dev.lu_bpmsum
wr.plot(x_name=mot.name, y_name=det.name) ## names first !
wr.x_label = mot.name
wr.y_label = det.name
g0 = dev.id_gap.readback.get()
# g_low = 4.5 # 4.5
# g_high = 9.0 # 9.0
# nsteps = 1500 # res = 3 um
en = estart
while en < end_en:
sete(en)
time.sleep(1)
rock()
print(f"setting energy to {en}")
time.sleep(2)
ds = scans.line_scan(
dev.id_gap, g_low, g_high, steps=nsteps, exp_time=0.8, relative=False
)
gap_data = ds.scan.live_data.id_gap.id_gap.val
bpm_data = ds.scan.live_data.lu_bpmsum.lu_bpmsum.val
wr.plot(x=gap_data, y=bpm_data)
# writing output to simple data file for later analysis:
combined = np.column_stack((gap_data, bpm_data))
filename = dirname + "gaps" + str(en) + ".txt"
with open(filename, "w") as f:
np.savetxt(f, combined, delimiter=",", fmt="%5f")
en += 1
return
##################################
def gscan(centre=0, gomax=0, detune=0):
"""
Scan the ID GAP and go
to the max of lu_bpm intensity
gscan(centre=0): just scan
gscan(centre=1): go to centre of fit max
gscan(centre=1, gomax=1): go to max of intensity
gscan(centre=1,detune=1): position of slightly less flux with nicer beam shape
"""
import time
dock_area = bec.gui.new()
wr = dock_area.new().new(bec.gui.available_widgets.Waveform)
mot = dev.id_gap
det = dev.lu_bpmsum
wr.plot(x_name=mot.name, y_name=det.name) ## names first !
# wr.plot(x=mot.name,y=det.name) ### this comes later
wr.x_label = mot.name
wr.y_label = det.name
g0 = dev.id_gap.readback.get()
deltag = 0.05
ds = scans.line_scan(
dev.id_gap, -deltag, deltag, steps=30, exp_time=0.5, relative=True
)
gap_data = ds.scan.live_data.id_gap.id_gap.val
bpm_data = ds.scan.live_data.lu_bpmsum.lu_bpmsum.val
#maxy = max(bpm_data)
#indmax = np.argmax(bpm_data)
#gm = gap_data[indmax]
gcen, xm = fit_plot(gap_data, bpm_data, model="gauss")
if gomax:
gm = xm
else:
gm = gcen
print("gap off by ", g0 - gm, " mm")
if detune:
gm=gm*0.996
print("moving to detuned gap value, slightly (0.15 %) below max")
if centre:
time.sleep(0.2)
if min(gap_data) <= gm <= max(gap_data):
scans.umv(dev.id_gap, gm, relative=False)
print("moving to ", gm, " mm")
else:
print("Fit too far off, try using option gomax=1")
return

0
pxii_bec/macros/__init__.py Normal file → Executable file
View File

355
pxii_bec/macros/calculator.py Executable file
View File

@@ -0,0 +1,355 @@
"""Utility functions for calculating energy, wavelength, and Bragg angle."""
from dataclasses import dataclass
import numpy as np
# from pxii_parameters import (EnergyDefaults, CamConversion)
@dataclass(frozen=True)
class Constants:
"""Constants used in energy calculations"""
# # Physical Constants from https://physics.nist.gov/cuu/Constants/index.html
ANGSTROM_CONVERSION = 1e10 # Convert meters to angstrom
PLANCK_CONST_EV = 4.135667696e-15 # eV/Hz
SPEED_OF_LIGHT = 299792458 # m/s
# d-spacings
d_spacing = {120: 3.13481, 298: 3.13562}
def speed_of_light_ang():
"""
Calculate the speed of light in angstroms per second.
Returns:
float: The speed of light converted to angstroms per second.
"""
return Constants.SPEED_OF_LIGHT * Constants.ANGSTROM_CONVERSION
def en_wav_factor():
"""
Calculate the energy wavelength factor.
This function computes a constant factor used to calculate energy
values in relation to wavelength by combining Planck's constant,
in eV/Hz, and the speed of light in angstrom.
Returns:
float: The computed energy wavelength factor.
"""
return Constants.PLANCK_CONST_EV * speed_of_light_ang()
# Helper Functions
def convert_to_degrees(angle_mrad: float) -> float:
"""
Convert an angle from milliradians to degrees.
Args:
angle_mrad: The angle value in milliradians.
Returns:
The angle converted into degrees as a float.
"""
return np.rad2deg(angle_mrad / 1000)
def create_conversion_result(
energy_ev: float, wavelength: float, bragg_angle_mrad: float
) -> dict:
"""
Creates a dictionary containing converted values of energy and angles.
This function takes the energy in electron-volts, the wavelength,
and the Bragg angle in milliradians as input. It computes and
returns a dictionary containing the energy in both electron-volts
and kiloelectron-volts, the wavelength, the Bragg angle in milliradians,
and the Bragg angle converted to degrees.
Args:
energy_ev: Energy value in electron-volts.
wavelength: Wavelength value.
bragg_angle_mrad: Bragg angle in milliradians.
Returns:
dict: A dictionary containing the following keys:
- "energy_kev": Energy value in kiloelectron-volts.
- "energy_ev": Energy value in electron-volts.
- "wavelength": Wavelength value.
- "bragg_angle_mrad": Bragg angle in milliradians.
- "bragg_angle_deg": Bragg angle in degrees.
"""
return {
"energy_kev": energy_ev / 1000,
"energy_ev": energy_ev,
"wavelength": wavelength,
"bragg_angle_mrad": float(bragg_angle_mrad),
"bragg_angle_deg": float(convert_to_degrees(bragg_angle_mrad)),
}
def print_conversion_result(result: dict) -> None:
"""
Prints the energy-related conversion results to the console.
"""
line = (
f"energy: {result['energy_ev']:.6g} eV, energy: {result['energy_kev']:.6g} keV, "
f"wavelength: {result['wavelength']:.4g} Å, "
f"bragg angle: {result['bragg_angle_mrad']:.5g} mrad, {result['bragg_angle_deg']:.4g} deg"
)
print(line)
# Conversion Functions
def calculate_wavelength_from_angle(bragg_angle_mrad: float, temp=120) -> float:
"""
calculate_wavelength_from_angle(bragg_angle_mrad: float) -> float
Arguments:
bragg_angle_mrad: The Bragg angle in milliradians, used to compute the
sine value required for the wavelength calculation.
Returns:
The calculated wavelength as a float value.
"""
d = Constants.d_spacing[temp]
return 2 * d * np.sin(bragg_angle_mrad / 1000)
def calculate_energy_from_wavelength(wavelength: float) -> float:
"""
Calculates the energy of a photon based on its wavelength.
Args:
wavelength: The wavelength of the photon in angstrom.
Returns:
The energy of the photon in eV.
"""
return en_wav_factor() / wavelength
def calculate_wavelength_from_energy(energy_ev: float) -> float:
"""
Calculates the wavelength of a photon from its energy.
Arguments:
energy_ev: float
The energy of the photon in electronvolts (eV).
Returns:
float
The calculated wavelength of the photon in angstrom.
"""
return en_wav_factor() / energy_ev
def calculate_bragg_angle_from_wavelength(wavelength: float, temp=120) -> float:
"""
Calculate the Bragg angle in milliradians for a given wavelength.
Args:
wavelength: The wavelength in angstrom.
Returns:
The Bragg angle in milliradians as a float.
"""
d = Constants.d_spacing[temp]
angle_rad = np.arcsin(wavelength / (2 * d))
return angle_rad * 1000
def convert_input_angle_to_mrad(bragg_angle: float) -> float:
"""
Convert input angle into milliradians (mrad).
This function takes an angle as input and determines its likely unit,
converting it to milliradians (mrad) if necessary. If the input value
is less than 1, it is assumed to be in radians and is converted to
mrad. If the input value falls between predefined minimum and
maximum values for mrad, it is assumed to be in degrees and thus
converted to mrad using the degrees-to-radians conversion factor.
For input values that don't match these scenarios, it assumes
that the input is already in mrad and returns it unchanged.
Arguments:
bragg_angle (float): The input Bragg angle, which can be in
radians, degrees, or milliradians.
Returns:
float: The Bragg angle converted into milliradians (mrad).
"""
if bragg_angle < 1: # Likely the input angle is in radians
return bragg_angle * 1000
if 3 < bragg_angle < 25: # Likely input angle is in degrees
return np.deg2rad(bragg_angle) * 1000
return bragg_angle # Already in mrad
# Core Functions
def validate_energy(energy_ev):
"""
Validates the energy value to ensure it falls within the acceptable range. The function
converts the provided energy from keV to eV if the input value is less than 1/1000 of the
maximum energy value. It then checks whether the energy is within the defined bounds.
If the energy value is outside the acceptable range, the function raises a ValueError.
Args:
energy_ev (float): The energy value in eV or keV to be validated. If this value is
smaller than 1/1000 of the maximum allowed energy (in eV), it will be multiplied
by 1000 to convert it from keV to eV.
Returns:
float: The validated energy value in eV that falls within the acceptable range.
Raises:
ValueError: If the energy value is outside the defined range of
[MIN_ENERGY_EV, MAX_ENERGY_EV].
"""
if energy_ev < EnergyDefaults.max_energy_ev / 1000: # Assuming the input is in keV.
energy_ev *= 1000
if not EnergyDefaults.min_energy_ev <= energy_ev <= EnergyDefaults.max_energy_ev:
raise ValueError(
f"Energy of {energy_ev} eV is outside the valid range "
f"({EnergyDefaults.min_energy_ev} eV to {EnergyDefaults.max_energy_ev} eV)"
)
return energy_ev
def convert_from_bragg(
bragg_angle_mrad: float, temp=120, print_result: bool = False
) -> dict:
"""
Convert the Bragg angle to wavelength and energy, returning the result as a dictionary.
This function converts a given Bragg angle (in milliradians) into the corresponding
wavelength and energy values, and returns them in a dictionary format. The function
also supports optional printing of the calculated results.
Args:
bragg_angle_mrad (float): The Bragg angle in milliradians to be converted.
print_result (bool): Whether to print the conversion result. Defaults to False.
Returns:
dict: A dictionary containing the following keys:
- 'energy_ev': Energy in electronvolts.
- 'wavelength': Wavelength corresponding to the input angle.
- 'bragg_angle_mrad': Input Bragg angle in milliradians.
"""
bragg_angle_mrad = convert_input_angle_to_mrad(bragg_angle_mrad)
wavelength = float(calculate_wavelength_from_angle(bragg_angle_mrad, temp=temp))
energy_ev = float(calculate_energy_from_wavelength(wavelength))
result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad)
if print_result:
print_conversion_result(result)
return result
def convert_from_energy(energy_ev: float, temp=120, print_result: bool = False) -> dict:
"""
Convert energy in electron volts (eV) to wavelength and Bragg angle in milliradians
(mrad). This method validates the given energy, calculates corresponding properties,
and optionally prints the result.
Args:
energy_ev: Energy value in electron volts (float) to be converted.
print_result: Flag indicating whether to print the resulting
conversion details (bool). Defaults to False.
Returns:
A dictionary containing the following key-value pairs:
- "energy_ev" (float): Validated energy in eV.
- "wavelength" (float): Calculated wavelength in meters.
- "bragg_angle_mrad" (float): Calculated Bragg angle in mrad.
"""
energy_ev = validate_energy(energy_ev)
wavelength = calculate_wavelength_from_energy(energy_ev)
bragg_angle_mrad = float(
calculate_bragg_angle_from_wavelength(wavelength, temp=temp)
)
result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad)
if print_result:
print_conversion_result(result)
return result
def convert_from_wavelength(
wavelength: float,
temp: float = 120,
print_result: bool = False,
) -> dict:
"""
Convert a given wavelength value into corresponding energy, Bragg angle, and
generate a result dictionary.
The function processes a wavelength value, checks its validity against a
permitted range, calculates corresponding energy and Bragg angle, and
formats the results into a dictionary. Optionally, the function can print
the result.
Parameters:
wavelength: float
The input wavelength value in Angstroms to be converted. Should
fall within the permitted wavelength range.
print_result: bool
Optional flag indicating whether to print the conversion result.
Default is False.
Returns:
dict
A dictionary containing the energy (electron-volts), wavelength
(Angstroms), and Bragg angle (milliradians). If the wavelength is
outside of the permitted range, returns None.
"""
energy_ev = calculate_energy_from_wavelength(wavelength)
bragg_angle_mrad = float(
calculate_bragg_angle_from_wavelength(wavelength, temp=temp)
)
result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad)
if print_result:
print_conversion_result(result)
return result
def calc_perp_position(
energy_ev: float,
print_result: bool = False,
) -> float:
"""
Calculate the perpendicular motor position based on provided energy in electron-volts (eV).
This function computes the perpendicular motor position using the given energy value in
electron-volts. The calculation is based on the Bragg angle derived from the energy. An optional
parameter allows printing the result during execution.
Parameters:
energy_ev (float): The energy value in electron-volts used for the calculation.
print_result (bool): Flag to determine whether to print the computed perpendicular offset.
Default is False.
Returns:
float: The computed perpendicular position.
Raises:
None
"""
result = convert_from_energy(energy_ev, print_result=False)
bragg_angle_rad = result["bragg_angle_mrad"] / 1000
perp_offset = float(EnergyDefaults.beam_offset / (2 * np.cos(bragg_angle_rad))) - 3
if print_result:
print(f"Perp = {perp_offset: .4f}")
return perp_offset
def calc_scam_microns(pixels, zoom=1000):
"""Convert pixels to microns for the sample camera"""
return float(pixels / (CamConversion.a * np.exp(CamConversion.b * zoom)))
def calc_bsccam_microns(pixels):
"""Convert pixels to microns for the BSC camera"""
return pixels*20

126
pxii_bec/macros/flux_di.py Executable file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python
# ### PYTHON pro to determine the flux from the current
# ### on a Si diode
# ### A Pauluhn, Dec 2012
# ### from IDL pro flux_diode.pro (2010)
## NOTE: for TRANSFER DIODE:
## switch off ANY ambient light
## note that: 10 um Si, and 0 um Al for transfer diode Hamamatsu
# ### import the standard
import os, sys, string, time
import re
import numpy as np
import math
def fluxcalc(curr, en, t_air, t_si, t_al):
#------ parameters:
rsi = 2.33 # g/cm^3 density of Si
ral = 2.699 # g/cm^3 density of Al
rair = 1.205e-3 # g/cm^3 density of air
eps_si = 3.62 # eV , energy req for charge separation in Si (el/hole pair)
e = 1.602e-19 # As , elem charge
t_si = 0.0012 # thickness of diode in cm (==12 micrometres) ; [*10000]
t_al = 0.002 # thickness of diode in cm (==20 micrometres)
#--------- calc----------------------------------------------------
#energy deposit in Silicon
#ratio of photoelectric cross section to density for Silicon
#alog10(A_Si)= 4.158 - 2.238*alog10(Ep) - 0.477*alog10(Ep)^2 + 0.0789*alog10(Ep)^3
ps = np.poly1d([0.0789, - 0.477, - 2.238, 4.158])
#print ps
# ps.c
# print ps(2.3)
leval = math.log10(en)
hlp_si = ps(leval)
a_si = np.pow(10, hlp_si)
#efact = np.exp(-A_Si*rsi*t_Si)
#sifact =1 - efact
sifact = - np.expm1(-a_si*rsi*t_si)
#print 'sifact = ', sifact
fl = 1000.*curr * eps_si/1.602/en/sifact *1.e13 # curr expected in A
#print 'fl = ', fl
#Aluminium attenuation
#ratio of photoelectric cross section to density for Aluminium
pa = np.poly1d([ 0.0638, - 0.413, - 2.349, 4.106])
hlp_al = pa(leval)
a_al = pow(10, hlp_al)
# attenuation due to aluminium
alfact = np.exp(-a_al*ral*t_al)
#print 'alfact = ', alfact
# Air attenuation
# ratio of photoelectric cross section to density for air
pair = np.poly1d([0.928, - 2.348, - 1.026, 3.153])
hlp_air = pair(leval)
a_air = np.pow(10, hlp_air)
#print('hlp_air, a_air, rair, t_air = ', hlp_air, a_air, rair , t_air)
# attenuation due to air
airfact = np.exp(-a_air*rair*t_air/10.)
#airfact = 1.
#print 'airfact = ', airfact
#print ' t_air = ', t_air
# total flux from photocurrent
fl = fl/alfact/airfact
#print 'fl = ', fl
return fl
#
# Main
#
def flux_x10sa():
curr = input('Please enter the current [in A] ')
curr = float(curr)
print('Current = ', curr)
en = input('Please enter the energy [in keV] ')
en = float(en)
print('Energy = ', en)
## d_det = input('Please enter the detector distance [in mm] ')
d_det = dev.det_z.user_readback.get()
d_off = 15 ## at X10SA ### input('Please enter the OFFSET of the diode to det surface [in mm] ')
t_air = float(d_det) + float(d_off)
print('Air path length (CARE ! add det distance and offset of diode) = ', t_air) # include additional diode distance
# t_si = input('Please enter the Si thickness of diode [in um] ')
# t_si = float(t_si) / 10000. # to cm
t_si = 12 # um ## at X10SA
# t_al = input('Please enter the Al thickness of diode [in um] ')
# t_al = float(t_al) / 10000. # to cm
t_al = 20 # um ## at X10SA
fl = fluxcalc(curr, en, t_air,t_si, t_al)
print( " Diode Current = %.6f A " % curr)
print( " Energy = %.4f keV " % en )
print( " Thickness of active Si layer = %.4f\t cm " % t_si) #*10000.
print( " Thickness of Al layer in front of diode = %.4f\t cm " % t_al) #*10000.
print( " Length of path of air in front of diode = %.4f\t mm " % t_air) #*10000.
print (" Flux = %6.2e\t ph/s " % fl)
return fl

254
pxii_bec/macros/mx_basics.py Executable file
View File

@@ -0,0 +1,254 @@
"""Get data from an h5 file or BEC history and perform fitting."""
import numpy as np
from lmfit.models import (
GaussianModel,
LorentzianModel,
VoigtModel,
ConstantModel,
LinearModel,
)
from scipy.ndimage import gaussian_filter1d
import h5py
import matplotlib.pyplot as plt
def create_fit_parameters(
deriv: bool = False,
model: str = "Voigt",
baseline: str = "Linear",
smoothing: None = None,
):
"""Store the fit parameters in a dictionary."""
# map input model to lmfit model name
model_mappings = {
"Gaussian": GaussianModel,
"Lorentzian": LorentzianModel,
"Voigt": VoigtModel,
"Constant": ConstantModel,
"Linear": LinearModel,
}
return {
"deriv": deriv,
"model": model_mappings[model],
"baseline": model_mappings[baseline],
"smoothing": smoothing,
}
def get_data_from_h5(signal_name: str = "lu_bpmsum"):
"""Get data from an h5 file."""
with h5py.File("scan_676.h5", "r") as f:
entry = f["entry"]["collection"]
y_data = entry["devices"][signal_name][signal_name]["value"][:]
motor_data = entry["metadata"]["bec"]
motor_name = motor_data["scan_motors"][0].decode()
scan_number = motor_data["scan_number"][()]
x_data = entry["devices"][motor_name][motor_name]["value"][:]
return {
"x_data": x_data,
"y_data": y_data,
"signal_name": signal_name,
"motor_name": motor_name,
"scan_number": str(scan_number),
}
def get_data_from_history(
history_index: int,
signal_name: str = "lu_bpmsum",
):
"""Read data from the BEC history and return the X and Y data as arrays."""
scan = bec.history[history_index]
md = scan.metadata["bec"]
motor_name = md["scan_motors"][0].decode()
scan_number = md["scan_number"]
x_data = scan.devices[motor_name][motor_name].read()["value"]
y_data = scan.devices[signal_name][signal_name].read()["value"]
return {
"signal_name": signal_name,
"x_data": x_data,
"y_data": y_data,
"motor_name": motor_name,
"scan_number": scan_number,
}
def process_data(data, fit_params):
"""
Process the signal data for fitting based on derivative or smoothing.
"""
smoothing, deriv = fit_params["smoothing"], fit_params["deriv"]
signal_name = data["signal_name"]
y_data = data["y_data"]
if deriv:
if smoothing:
y_smooth = gaussian_filter1d(y_data, smoothing)
fitting_data = np.gradient(y_smooth)
signal_name = f"Derivative of smoothed {signal_name}"
else:
fitting_data = np.gradient(y_data)
signal_name = f"Derivative of {signal_name}"
elif smoothing and smoothing > 0.01:
fitting_data = gaussian_filter1d(y_data, smoothing)
signal_name = f"Smoothed {signal_name}"
else:
fitting_data = y_data
updated_data = {
"y_to_fit": fitting_data,
"signal_name": signal_name,
}
data.update(updated_data)
return data
def fit(data, fit_params):
"""Fit a signal to a model and return the fitting results."""
# Create the model
peak_model = fit_params["model"](prefix="peak_")
baseline_model = fit_params["baseline"](prefix="base_")
full_model = peak_model + baseline_model
# Prepare data
processed_data = process_data(data, fit_params)
params = full_model.make_params()
y_min = np.min(processed_data["y_to_fit"])
# Configure baseline parameters
if fit_params["baseline"] == ConstantModel:
params["base_c"].set(value=y_min)
elif fit_params["baseline"] == LinearModel:
params["base_intercept"].set(value=y_min)
params["base_slope"].set(value=0)
# Add peak-specific parameters
params.update(
peak_model.guess(processed_data["y_to_fit"], x=processed_data["x_data"])
)
# Perform the fitting
lmfit_result = full_model.fit(
processed_data["y_to_fit"], params, x=processed_data["x_data"]
)
# Find the X that gives the max Y
max_index = np.argmax(processed_data["y_to_fit"])
x_max = processed_data["x_data"][max_index]
# Collect results
return {
"model": fit_params["model"].__name__,
"fwhm": lmfit_result.params["peak_fwhm"].value,
"centre": lmfit_result.best_values["peak_center"],
"height": lmfit_result.params["peak_height"].value,
"chi_sq": lmfit_result.chisqr,
"lmfit_result": lmfit_result,
"x_max": x_max,
}
def plot_fitted_data(data, fit_result):
"""Plot the original data and the fitted model."""
plt.plot(data["x_data"], data["y_to_fit"], label="Data")
plt.plot(
data["x_data"],
fit_result["lmfit_result"].best_fit,
"-",
label=f"FWHM = {fit_result['fwhm']:.3f},"
f"Centre = {fit_result['centre']:.3f}, "
f"Height = {fit_result['height']:.3f}",
)
plt.xlabel(data["motor_name"])
plt.ylabel(data["signal_name"])
plt.title(f"Scan {data['scan_number']}, fitted with {fit_result['model']}")
plt.grid(True)
plt.legend()
plt.show()
def select_bec_window(dock_area_name="Fitting"):
"""Check to see if the fitting results dock is already open and re-create it if not"""
open_docks = bec.gui.windows
if open_docks.get(dock_area_name) is None:
dock_area = bec.gui.new(dock_area_name)
wf = dock_area.new("Plot").new(bec.gui.available_widgets.Waveform)
text_box = dock_area.new("Results", position="bottom").new(
widget=bec.gui.available_widgets.TextBox
)
else:
wf = bec.gui.Fitting.Plot.Waveform
text_box = bec.gui.Fitting.Results.TextBox
return wf, text_box
def plot_live_data_bec(
motor_name,
signal_name,
window_name="Fitting"
):
"""
Plotting live data for motor and signal using BEC.
This function plots live data from a specified motor and signal.
It clears the current plot window, sets its title, labels the axes
with the provided motor and signal names, and initializes live plotting
on the given signal against the motor.
Args:
motor_name (str): The name of the motor to be used as the x-axis.
signal_name (str): The name of the signal to be used as the y-axis.
Returns:
None
"""
wf, text_box = select_bec_window(window_name)
text_box.set_plain_text("Plotting live data")
wf.clear_all()
wf.title = "Scan: Live scan"
wf.x_label = motor_name
wf.y_label = signal_name
wf.plot(x_name=motor_name, y_name=signal_name)
def plot_fitted_data_bec(
data,
fit_result,
):
"""
Plot fitted data and display fitting parameters in the specified window.
This function selects a BEC window and plots the original data along with the
fitted function. Additionally, it displays the fitting results in a text
box within the same window for better visualization of the fit results.
Parameters:
data : dict
Dictionary containing the original dataset, where 'x_data' and 'y_to_fit'
hold the independent variable and the dependent variable, respectively,
'scan_number' represents the scan number, 'motor_name' and 'signal_name'
provide axis labels.
fit_result : dict
Dictionary containing the results of the fit, including parameters such
as 'centre', 'fwhm', 'height', and the fitted model stored under
'lmfit_result', with its 'best_fit' attribute representing the fitted data.
"""
wf, text_box = select_bec_window()
fit_text = (
f"Fit parameters: Centre = {fit_result['centre']:.4f}, "
f"FWHM = {fit_result['fwhm']:.3f}, "
f"Height = {fit_result['height']:.4f}\n"
f"Model = {fit_result['model']}\n"
f"Chi sq = {fit_result['chi_sq']:.3g}"
)
text_box.set_plain_text(fit_text)
wf.clear_all()
wf.title = f"Scan: {data['scan_number']}"
wf.x_label = data["motor_name"]
wf.y_label = data["signal_name"]
wf.plot(x=data["x_data"], y=data["y_to_fit"], label="data")
wf.plot(
x=data["x_data"], y=fit_result["lmfit_result"].best_fit, label="Fit to data"
)

344
pxii_bec/macros/mx_methods.py Executable file
View File

@@ -0,0 +1,344 @@
"""Use the methods in mx_basics to perform:
1) a go_to_peak scan, that scans a motor, finds the peak position and moves to peak
2) fits data from a bec history file
"""
import numpy as np
# from pxiii_parameters import FitDefaults, BPMScans, MirrorConfig
# from mx_basics import (
# create_fit_parameters,
# get_data_from_history,
# fit,
# plot_fitted_data_bec,
# plot_live_data_bec,
# )
# Method functions
def calculate_step_size(start: float, stop: float, steps: int) -> float:
"""
Provides the function to calculate the step size for dividing a specified range
into a given number of steps.
Args:
start: The starting value of the range.
stop: The stopping value of the range.
steps: The number of steps to divide the range into. Must be at least 1.
Raises:
ValueError: If the steps value is less than 1.
Returns:
The calculated step size as a float, rounded to three decimal places.
"""
if steps < 1:
raise ValueError("Number of steps must be at least 1.")
return round((stop - start) / steps, 3)
def move_to_position(motor_device, motor_name: str, position: float, data: dict):
"""
Function to move a specified motor device to a given position.
The function verifies if the requested position is within the scan range of the
motor device provided. If the position is outside the range, the motor is
moved to the center of its scan range, an error message is raised, and the
operation is halted. If the position is valid, the motor is moved to the
specified position.
Parameters:
motor_device: The motor device to be moved.
motor_name: str
The name of the motor as a string
position: float
The desired position to move the motor to. Position should be within
the scan range of the motor determined by the provided data.
data: dict
A dictionary containing "x_data", which is used to determine the
scan range of the motor.
Raises:
ValueError: Raised if the specified position is outside the valid scan
range determined by "x_data" in the data dictionary. The motor will
return to the center of its scan range in this case.
"""
motor_min = np.min(data["x_data"])
motor_max = np.max(data["x_data"])
motor_centre = (motor_max + motor_min) / 2
if not motor_min <= position <= motor_max:
umv(motor_device, motor_centre)
msg = (
f"Position {position: .2f} is outside the scan range of "
f"{motor_min: .2f}to {motor_max: .2f}. "
f"Returning to centre of scan range {motor_centre: .3f}."
)
raise ValueError(msg)
motor_position = round(position, 4)
umv(motor_device, motor_position)
print(f"\n Moving {motor_name} to position {motor_position: .3f}")
def go_to_peak(
motor_device,
signal_device,
start: float,
stop: float,
steps: int,
relative: bool = FitDefaults.RELATIVE_MODE,
plot: bool = True,
settle: float = FitDefaults.SETTLE_TIME,
confirm: bool = True,
gomax: bool = False,
):
"""
Go to the peak of a signal by scanning a motor within a specified range and
identifying the optimal position based on signal peak data.
Parameters:
motor_device: The motor device to be scanned.
signal_device: The signal device to monitor during the scan.
start (float): The starting position of the scan. Ignored if `relative` is True.
stop (float): The ending position of the scan. Ignored if `relative` is True.
steps (int): The number of steps to divide the scan range into.
relative (bool, optional): If True, interpret `start` and `stop` as relative to
the current motor position. Defaults to RELATIVE_MODE constant.
plot (bool, optional): If True, plot the scan data and the fitted results.
Defaults to True.
settle (float, optional): The time in seconds to wait after each step for the
signal to stabilize. Defaults to DEFAULT_SETTLE_TIME constant.
confirm (bool, optional): If True, ask for user confirmation before starting
the scan. Defaults to True.
Raises:
Exception: Raises exceptions potentially raised by dependent functions or
operations such as plotting, fitting, or motor movement.
Returns:
None
"""
motor_name = motor_device.name
signal_name = signal_device.name
# wf.plot(x_name=motor_name, y_name=signal_name)
if plot:
plot_live_data_bec(motor_name, signal_name)
# Validate and calculate step size
step_size = calculate_step_size(start, stop, steps)
# Confirm the scan range
# current_motor_position = motor_device.user_readback.get()
current_motor_position = motor_device.read()[motor_name]["value"]
if confirm:
if relative:
scan_start = current_motor_position + start
scan_end = current_motor_position + stop
print(
f"\nScanning from {scan_start: .6g} to {scan_end: .6g} in "
f"{steps} steps of size {step_size}"
)
print(f"Relative mode = {relative}")
else:
print(
f"\nScanning from {start: .5g} to {stop: .5g} in {steps} steps of size {step_size}"
)
print(f"Relative mode = {relative}")
input("Press Enter to continue...")
# Perform the scan
scan_result = scans.line_scan(
motor_device, start, stop, steps=steps, relative=relative, settling_time=settle
)
motor_data = scan_result.scan.live_data[motor_name][motor_name].val
signal_data = scan_result.scan.live_data[signal_name][signal_name].val
scan_number = "Current"
data = {
"x_data": np.array(motor_data),
"y_data": np.array(signal_data),
"motor_name": motor_name,
"signal_name": signal_name,
"motor_device": motor_device,
"scan_number": scan_number,
}
# Define and fit model to scan data
fit_params = create_fit_parameters(False, FitDefaults.MODEL, FitDefaults.BASELINE)
fit_result = fit(data, fit_params)
# Plot the fitted data if plot = True
if plot:
plot_fitted_data_bec(data, fit_result)
# If gomax is set then move to the maximum value, rather than the fit centre
if gomax:
value = fit_result["x_max"]
print(f"Max position is at {value}")
move_to_position(
data["motor_device"], data["motor_name"], fit_result["x_max"], data
)
else:
# Safely move the motor to the peak position
move_to_position(
data["motor_device"], data["motor_name"], fit_result["centre"], data
)
def fit_history(
history_index: int,
signal_name: str,
deriv: bool = False,
model: str = FitDefaults.MODEL,
move_to_peak: bool = False,
):
"""
Retrieve and analyze historical data by fitting a model, optionally moving to
a peak position.
Parameters:
history_index (int): Index of the historical data set to retrieve.
signal_name (str): Name of the signal to fit.
deriv (bool, optional): Whether to include the derivative in the fitting
procedure. Defaults to False.
model (str, optional): Name of the model to use for fitting. Defaults to
DEFAULT_MODEL.
move_to_peak (bool, optional): Whether to move the motor to the peak position
after fitting. Defaults to False.
Raises:
KeyError: If required keys are not found in the retrieved data dictionary.
ValueError: If the fitting process fails or produces invalid results.
Returns:
None
"""
# Retrieve historical data
data = get_data_from_history(history_index, signal_name)
# Define fitting parameters
fit_params = create_fit_parameters(deriv, model, FitDefaults.BASELINE)
# Perform fit and plot the data
fit_result = fit(data, fit_params)
plot_fitted_data_bec(data, fit_result)
# Optionally move the motor to the peak position
if move_to_peak:
move_to_position(
data["motor_device"], data["motor_name"], fit_result["centre"], data
)
def scan_bpm(bpmname):
"""
Runs a grid scan of a BPM in x and y, and plots each channel
as a heatmap.
Parameters:
bpmname: the name of the bpm to be scanned e.g. "fe"
"""
# Open a dock area and set up the heatmaps
dock_area = bec.gui.new("XBPM_Scan")
wf5 = dock_area.new("Sum").new(bec.gui.available_widgets.Heatmap)
wf1 = dock_area.new("Ch1", relative_to="Sum", position="bottom").new(
bec.gui.available_widgets.Heatmap
)
wf3 = dock_area.new("Ch3", relative_to="Ch1", position="right").new(
bec.gui.available_widgets.Heatmap
)
wf4 = dock_area.new("Ch4", relative_to="Ch3", position="bottom").new(
bec.gui.available_widgets.Heatmap
)
wf2 = dock_area.new("Ch2", relative_to="Ch1", position="bottom").new(
bec.gui.available_widgets.Heatmap
)
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
cfg = getattr(BPMScans, bpmname)
wf1.x_label = cfg["x_name"]
wf1.y_label = cfg["y_name"]
wf1.plot(
x_name=cfg["x_name"],
y_name=cfg["y_name"],
z_name=cfg["z1_name"],
color_map="plasma",
)
wf2.x_label = cfg["x_name"]
wf2.y_label = cfg["y_name"]
wf2.plot(
x_name=cfg["x_name"],
y_name=cfg["y_name"],
z_name=cfg["z2_name"],
color_map="plasma",
)
wf3.x_label = cfg["x_name"]
wf3.y_label = cfg["y_name"]
wf3.plot(
x_name=cfg["x_name"],
y_name=cfg["y_name"],
z_name=cfg["z3_name"],
color_map="plasma",
)
wf4.x_label = cfg["x_name"]
wf4.y_label = cfg["y_name"]
wf4.plot(
x_name=cfg["x_name"],
y_name=cfg["y_name"],
z_name=cfg["z4_name"],
color_map="plasma",
)
wf5.x_label = cfg["x_name"]
wf5.y_label = cfg["y_name"]
wf5.plot(
x_name=cfg["x_name"],
y_name=cfg["y_name"],
z_name=cfg["z5_name"],
color_map="plasma",
)
# Run the scan
x_mot = cfg["x_device"]
y_mot = cfg["y_device"]
# scans.grid_scan(x_mot, -0.5, 0.5, 20, y_mot, -0.5, 0.5, 20,
# exp_time=0.5, relative=False, snaked=True)
def optimise_kb(mirror):
"""
Runs a grid scan of a the upstream and downstream benders,
and plots a heatmap of the sample camera x or y sigma.
Parameters:
mirror: either "hfm" or :vfm"
"""
# Open a dock area and set up the heatmaps
dock_area = bec.gui.new(mirror)
wf1 = dock_area.new("Heatmap").new(bec.gui.available_widgets.Heatmap)
wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl)
cfg = getattr(MirrorConfig, mirror)
wf1.x_label = cfg["bu_name"]
wf1.y_label = cfg["bd_name"]
wf1.plot(
x_name=cfg["bu_name"],
y_name=cfg["bd_name"],
z_name=cfg["z_name"],
color_map="plasma",
)
# Run the scan
x_mot = cfg["x_device"]
y_mot = cfg["y_device"]
# scans.grid_scan(x_mot, -0.02, 0.02, 11, y_mot, -0.02, 0.02, 11,
# exp_time=0.5, relative=True, snaked=True)

244
pxii_bec/macros/pxii_energy.py Executable file
View File

@@ -0,0 +1,244 @@
"""Script to change energy at PXII by setting gap, DCM motors and mirror stripe
Moving DCM motors - implemented for Bragg, pitch and perp
Gap - optional, can be switched off using move_gap=False
Mirrors - change of mirror stripe is not yet implemented
Plotting optional
"""
import numpy as np
# from pxii_gap import set_gap
# from mx_methods import go_to_peak
# from pxii_parameters import EnergyDefaults, Calibration
# from calculator import (
# calc_perp_position,
# validate_energy,
# convert_from_bragg,
# convert_from_energy,
# )
def get_current_energy():
"""
Returns the energy in eV from the current bragg angle.
"""
current_bragg_angle = dev.dcm_bragg.user_readback.get()
current_energy = convert_from_bragg(current_bragg_angle, print_result=False)[
"energy_ev"
]
return current_energy
# Functions below are common to all beamlines
def calculate_energy_difference(current_energy, target_energy):
"""
Calculate the absolute difference in energy between the current energy level
and the target energy level.
"""
return abs(target_energy - current_energy)
def get_mirror_stripe(energy_ev):
"""
Determines the mirror stripe material based on the energy level provided
and the specified thresholds for silicon, rhodium, and platinum.
Args:
energy_ev (float): Energy level in electron volts, used to determine
the corresponding material type.
Returns:
str: A string indicating the material type corresponding to the provided
energy level. Possible values are "silicon", "rhodium", or "platinum".
"""
if energy_ev <= EnergyDefaults.stripe_thresholds["silicon"]:
return "silicon"
if (
EnergyDefaults.stripe_thresholds["silicon"]
< energy_ev
<= EnergyDefaults.stripe_thresholds["rhodium"]
):
return "rhodium"
return "platinum"
def set_mirror_stripe(energy_ev):
"""
Selects and sets the appropriate mirror stripe based on the given energy value
in electron volts (eV).
Args:
energy_ev (float): The energy value in electron volts
Returns:
None
"""
selected_stripe = get_mirror_stripe(energy_ev)
print(f"Selected mirror stripe: {selected_stripe}")
def mono_pitch_scan(plot=True):
"""Scan the monochromator pitch and move to the peak."""
if plot:
print("Scanning monochromator pitch and moving to peak, with plotting.")
go_to_peak(
EnergyDefaults.mono_pitch,
EnergyDefaults.signals["sig1"],
-EnergyDefaults.pitch_scan["halfwidth"],
EnergyDefaults.pitch_scan["halfwidth"],
steps=EnergyDefaults.pitch_scan["steps"],
relative=True,
settle=0.01,
plot=True,
confirm=False,
)
else:
print("Scanning monochromator pitch and moving to peak, without plotting.")
go_to_peak(
EnergyDefaults.mono_pitch,
EnergyDefaults.signals["sig1"],
-EnergyDefaults.pitch_scan["halfwidth"],
EnergyDefaults.pitch_scan["halfwidth"],
steps=EnergyDefaults.pitch_scan["steps"],
relative=True,
settle=0.01,
plot=False,
confirm=False,
)
# Specific functions - need to be edited for each beamline
def move_gap_if_needed(energy_ev, move_gap=False):
"""
Move the gap position based on energy if the move_gap flag is set.
Args:
energy_ev (float): The energy value in electron volts (eV) used for gap movement.
move_gap (bool): A flag indicating whether the gap position should be moved or not.
Returns:
None
"""
if move_gap:
set_gap(energy_ev)
else:
print("Not moving gap.")
# def get_roll(energy_ev):
# """Calculates the roll based on calibration data."""
# calib = Calibration()
# roll = np.poly1d(calib.roll)(energy_ev)
# if roll < 0:
# return 0
# if roll >= 9.95:
# return 9.95
# return float(roll)
def get_dcm_motors_positions(energy_ev):
"""
Pitch and roll are calculated based on fits of the measured calibration values.
Perp and Bragg are calculated based on the energy value via calculator.py.
Arguments:
energy_ev (float): The energy value in electron volts for which the
DCM motor positions are to be calculated.
Returns:
dict: A dictionary containing the calculated DCM motor positions
including values retrieved from the lookup table, Bragg angle
in milliradians, and perpendicular position.
"""
# dcm_motor_values = get_value_from_lut(energy_ev)
dcm_motor_values = {}
# pitch = float(np.poly1d(Calibration.pitch)(energy_ev))
pitch = -5.304
roll = EnergyDefaults.mono_roll_value
perp = calc_perp_position(energy_ev, print_result=False)
bragg_angle = convert_from_energy(energy_ev, print_result=False)["bragg_angle_mrad"]
dcm_motor_values.update(
{
"bragg_angle": bragg_angle,
"perp": perp,
"dcm_pitch": pitch,
"dcm_roll": roll,
}
)
return dcm_motor_values
def move_dcm_motors(energy_ev):
"""
Move the DCM bragg, pitch and perp motors to the required positions
for the given energy in eV.
"""
dcm_pos = get_dcm_motors_positions(energy_ev)
print(
f"Moving DCM bragg: {dcm_pos['bragg_angle']: .5g} mrad, "
f"DCM perp: {dcm_pos['perp']: .3g} mm, "
f"DCM pitch fixed at -5.304 mrad, "
# f"DCM pitch: {dcm_pos['dcm_pitch']: .5g} mrad, "
f"DCM Roll: {dcm_pos['dcm_roll']:.5g} V"
)
# umv(EnergyDefaults.energy, dcm_pos["bragg_angle"])
# umv(EnergyDefaults.mono_perp, dcm_pos["perp"])
# umv(EnergyDefaults.mono_pitch, dcm_pos["dcm_pitch"])
# umv(EnergyDefaults.mono_roll, dcm_pos["dcm_roll"])
# print("\n***DCM Roll movement is currently disabled ***\n")
umv(EnergyDefaults.energy, dcm_pos["bragg_angle"],
EnergyDefaults.mono_perp, dcm_pos["perp"],
EnergyDefaults.mono_pitch, dcm_pos["dcm_pitch"],
EnergyDefaults.mono_roll, dcm_pos["dcm_roll"])
def bl_energy(energy_ev, move_gap=False, mono_scan=True, plot=True):
"""
Adjusts the beamline's energy to the specified energy in electron volts (eV).
The function validates the target energy, checks the current energy, and makes
adjustments only if the energy difference is significant enough. It performs
necessary operations including moving the gap, changing DCM motors, updating
the mirror stripe, and scanning to find the optimal DCM pitch.
Args:
energy_ev: Target energy in electron volts to which the beamline should be adjusted.
move_gap: Boolean flag indicating whether to adjust the gap before setting energy.
plot: Boolean flag indicating whether to plot the DCM pitch scan for finding the peak.
Returns:
None
"""
energy_ev = validate_energy(energy_ev) # Ensure energy is valid.
# Check current energy to avoid unnecessary adjustments.
current_energy = get_current_energy()
energy_diff = calculate_energy_difference(current_energy, energy_ev)
if energy_diff <= EnergyDefaults.min_energy_change:
print(
f"Energy change of {energy_diff:.2f} eV is too small, not changing energy."
)
return
# Step 1: Move the gap if needed.
move_gap_if_needed(energy_ev, move_gap)
# Step 2: Move and set the DCM motors.
move_dcm_motors(energy_ev)
# Step 3: Update the mirror stripe.
set_mirror_stripe(energy_ev)
# Step 4: Perform DCM pitch scan and move to peak.
if mono_scan:
if plot:
mono_pitch_scan(plot=True)
else:
mono_pitch_scan(plot=False)

View File

@@ -0,0 +1,256 @@
"""File to store beamline parameters and defaults"""
from dataclasses import dataclass
import numpy as np
@dataclass(frozen=True)
class FitDefaults:
"""Default values for fitting routines"""
# Constants for default models, baselines, and parameters
MODEL = "Voigt"
BASELINE = "Linear"
SETTLE_TIME = 0.1
RELATIVE_MODE = True
@dataclass(frozen=True)
class EnergyDefaults:
"""Parameters for PXII energy changes"""
min_energy_change = 1
min_energy_ev = 4800
max_energy_ev = 30002
beam_offset = 6
signals = {
"sig1": dev.lu_bpmsum,
"sig2": dev.bsc_bpmsum,
"sig3": dev.bcu_bpmsum,
}
energy = dev.dcm_bragg
mono_pitch = dev.dcm_pitch
mono_perp = dev.dcm_perp
mono_roll = dev.dcm_froll
mono_roll_value = 4.65
LUT_table = "luts/energy_lut.csv"
stripe_thresholds = {"silicon": 9000, "rhodium": 20000, "platinum": 40000}
pitch_scan = {"halfwidth": 0.075, "steps": 20}
@dataclass(frozen=True)
class Calibration:
"""Calibration parameters for PXII optics"""
pitch = np.array([4.61823701e-14, -1.97330772e-09, 2.89694543e-05, -5.34468669e00])
roll = np.array([2.28291039e-03, -2.41928101e01])
@dataclass(frozen=True)
class Gap:
harmonics = {
"H3": np.array([9.15e-04, 4.49e-01]),
"H5": np.array([5.19e-04, 7.149e-01]),
"H7": np.array([3.57694643e-04, 8.73775476e-01]),
"H9": np.array([2.76335714e-04, 8.98471905e-01]),
"H11": np.array([2.2225e-04, 9.582e-01]),
"H13": np.array([1.9e-4, 9.262e-01]),
"H15": np.array([1.67e-4, 8.83e-01]),
}
# Define harmonic ranges as a constant
harmonic_ranges = {
"H3": (4900, 7000),
"H5": (7000, 10000),
"H7": (10000, 13000),
"H9": (13000, 16000),
"H11": (16000, 19000),
"H13": (19000, 22000),
"H15": (22000, float("inf")),
}
@staticmethod
def get_harmonic_by_energy(energy_ev: float):
"""
Determines the harmonic key based on the provided energy.
Args:
energy_ev (float): The energy value (eV).
Returns:
Optional[str]: The harmonic name (e.g., 'H3', 'H7') if the range matches, None otherwise.
"""
for harmonic, (low, high) in Gap.harmonic_ranges.items():
if low < energy_ev <= high:
return harmonic
return None
def get_harmonic_values(self, energy_ev: float):
"""
Retrieves the harmonic array based on the energy value.
Args:
energy_ev (float): The energy value (eV).
Returns:
Optional[np.array]: The corresponding array of harmonic values if the range matches, None otherwise.
"""
harmonic = self.get_harmonic_by_energy(energy_ev)
return self.harmonics.get(harmonic) if harmonic else None
@dataclass(frozen=True)
class Harmonics:
"""Anuschka's harmonics data for PXII U19 Undulator"""
min_gap_value = 4.5
map = {
3: np.array([1.07141725, -0.52834258]),
4: np.array([0.007111, -0.13678409, 1.52295567, -1.06882785]),
5: np.array([0.00315051, -0.0766359, 1.13107469, -0.86442062]),
6: np.array([0.00162862, -0.04452336, 0.82948259, -0.37329674]),
7: np.array([0.00080764, -0.02418882, 0.59212947, 0.15702886]),
8: np.array([1.16699242e-03, -4.68207543e-02, 9.43972396e-01, -1.94201019e00]),
9: np.array([0.00048419, -0.02010648, 0.55514114, -0.38180067]),
10: np.array([2.11583333e-04, -8.03503571e-03, 3.43141595e-01, 6.02318000e-01]),
11: np.array([0.00334392, -0.18405336, 3.58338994, -19.3640241]),
12: np.array([3.20520798e-05, 2.39253145e-03, 8.09198503e-02, 2.22897377e00]),
13: np.array([0.00278744, 0.07979874, 2.05143916]),
}
energy_ranges = {
3: (0, 7),
5: (7, 10),
7: (10, 13),
9: (13, 16),
11: (16, 19),
13: (19, 22),
}
high_energy = [(15, (23, 25)), (17, (25, 29)), (19, (29, float("inf")))]
@dataclass(frozen=True)
class CamConversion:
"""Convert pixels to microns for sam cam"""
a = 0.5208
b = 0.002586
@dataclass(frozen=True)
class BPMScans:
"""Define the names of the motors and bpm channels"""
fe = {
"x_name": dev.fe_bpm_x.name,
"y_name": dev.fe_bpm_y.name,
"z1_name": dev.fe_bpm1.name,
"z2_name": dev.fe_bpm2.name,
"z3_name": dev.fe_bpm3.name,
"z4_name": dev.fe_bpm3.name,
"z5_name": dev.fe_bpmsum.name,
"x_device": dev.fe_bpm_x,
"y_device": dev.fe_bpm_y,
}
lu = {
"x_name": dev.lu_bpm_x.name,
"y_name": dev.lu_bpm_y.name,
"z1_name": dev.lu_bpm1.name,
"z2_name": dev.lu_bpm2.name,
"z3_name": dev.lu_bpm3.name,
"z4_name": dev.lu_bpm4.name,
"z5_name": dev.lu_bpmsum.name,
"x_device": dev.lu_bpm_x,
"y_device": dev.lu_bpm_y,
}
bsc = {
"x_name": dev.bsc_bpm_x.name,
"y_name": dev.bsc_bpm_y.name,
"z1_name": dev.bsc_bpm1.name,
"z2_name": dev.bsc_bpm2.name,
"z3_name": dev.bsc_bpm3.name,
"z4_name": dev.bsc_bpm4.name,
"z5_name": dev.bsc_bpmsum.name,
"x_device": dev.bsc_bpm_x,
"y_device": dev.bsc_bpm_y,
}
bcu = {
"x_name": dev.bcu_bpm_x.name,
"y_name": dev.bcu_bpm_y.name,
"z1_name": dev.bcu_bpm1.name,
"z2_name": dev.bcu_bpm2.name,
"z3_name": dev.bcu_bpm3.name,
"z4_name": dev.bcu_bpm4.name,
"z5_name": dev.bcu_bpmsum.name,
"x_device": dev.bcu_bpm_x,
"y_device": dev.bcu_bpm_y,
}
@dataclass(frozen=True)
class MirrorConfig:
"""Define the names of the mirror channels"""
hfm = {
"bu_name": dev.hfm_bu.name,
"bd_name": dev.hfm_bd.name,
"z_name": dev.samcam_xsig.name,
"x_device": dev.hfm_bu,
"y_device": dev.hfm_bd,
}
vfm = {
"bu_name": dev.vfm_bu.name,
"bd_name": dev.vfm_bd.name,
"z_name": dev.samcam_ysig.name,
"x_device": dev.vfm_bu,
"y_device": dev.vfm_bd,
}
from typing import Callable
@dataclass
class Target:
inpos: float
outpos: float
tol: float
mot: str
reader: Callable[[], float]
@property
def actual(self):
return self.reader()
def checkpos(self):
return abs(self.actual - self.inpos) <= self.tol
def mvin(self):
umv(self.mot, self.inpos)
def mvout(self):
umv(self.mot, self.outpos)
@dataclass
class GroupTarget:
def __init__(self, **targets: Target):
self.targets = targets
def checkpos(self):
return all(t.checkpos() for t in self.targets.values())
def report(self):
return {name: t.checkpos() for name, t in self.targets.items()}
@dataclass(frozen=True)
class SE:
"""Define settings for scintillator, collimator, i1"""
scin = Target(38.6, 20.0, 0.1, dev.scin_y, lambda: dev.scin_y.read()['scin_y']['value'])
i1 = Target(44.0, 20.0, 0.2, dev.scin_y, lambda: dev.scin_y.read()['scin_y']['value'])
colly = Target(41.5, 20.0, 0.05, dev.coll_y, lambda: dev.coll_y.read()['coll_y']['value'])
bsy = Target(0.1, -0.9, 0.05, dev.bs_y, lambda: dev.bs_y.read()['bs_y']['value'])
bsx = Target(2.45, 2.45, 0.05, dev.bs_x, lambda: dev.bs_x.read()['bs_x']['value'])
# coll = GroupTarget(
# x = Target(0.0517, 0.0517, 0.02, dev.coll_x, lambda: dev.coll_x.read()['coll_x']['value']),
# y = Target(41.5, 20.0, 0.05, dev.coll_y, lambda: dev.coll_y.read()['coll_y']['value']),
# )
# bs = GroupTarget(
# x = Target(2.65, 2.65, 0.05, dev.bs_x, lambda: dev.bs_x.read()['bs_x']['value']),
# y = Target(0.1, 0.1, 0.05, dev.bs_y, lambda: dev.bs_y.read()['bs_y']['value'])
# )