From 83e5acb01233dd3b8db5e0d23ef50d602c35294b Mon Sep 17 00:00:00 2001 From: x10sa Date: Mon, 26 Jan 2026 10:34:50 +0100 Subject: [PATCH] added AP and KM macros --- pxii_bec/macros/APscripts.py | 1522 ++++++++++++++++++++++++++++ pxii_bec/macros/Undu_Helpers.py | 320 ++++++ pxii_bec/macros/__init__.py | 0 pxii_bec/macros/calculator.py | 355 +++++++ pxii_bec/macros/flux_di.py | 126 +++ pxii_bec/macros/mx_basics.py | 254 +++++ pxii_bec/macros/mx_methods.py | 344 +++++++ pxii_bec/macros/pxii_energy.py | 244 +++++ pxii_bec/macros/pxii_parameters.py | 256 +++++ 9 files changed, 3421 insertions(+) create mode 100755 pxii_bec/macros/APscripts.py create mode 100755 pxii_bec/macros/Undu_Helpers.py mode change 100644 => 100755 pxii_bec/macros/__init__.py create mode 100755 pxii_bec/macros/calculator.py create mode 100755 pxii_bec/macros/flux_di.py create mode 100755 pxii_bec/macros/mx_basics.py create mode 100755 pxii_bec/macros/mx_methods.py create mode 100755 pxii_bec/macros/pxii_energy.py create mode 100755 pxii_bec/macros/pxii_parameters.py diff --git a/pxii_bec/macros/APscripts.py b/pxii_bec/macros/APscripts.py new file mode 100755 index 0000000..ea26559 --- /dev/null +++ b/pxii_bec/macros/APscripts.py @@ -0,0 +1,1522 @@ +################################ +### Scripts/"Macros" for BEC: + +################################ + +# Python scripts for running scan plans in BEC are to be stored in +# ~/bec/scripts/ + +import numpy as np +import math +import matplotlib.pyplot as plt + + +################################ +### BASIC BL PROGRAMS + +##################### +### a2e +##################### + + +def a2e(a, *hkl): + import math + + # possible keywords: # dictionary in python + # DEG = deg + # LN = ln + + i = 0 + count = 0 + h = [1, 1, 1] + iln = 0 + ideg = 0 + while i < 3 and count < len(hkl): + for var in hkl: + count = count + 1 + # print(var) + if type(var) == int: + print("var int ", var, i) + h[i] = var + i = i + 1 + + if "ln" in hkl: + iln = 1 + if "deg" in hkl: + ideg = 1 + + d0 = ( + 2 + * 5.43102 + * (1 - 2.4e-4 * iln) + / np.sqrt(h[0] ** 2.0 + h[1] ** 2.0 + h[2] ** 2.0) + ) + + if ideg or (a > 1): + a = math.radians(a) # *math.pi/180. + + en = 12.39842 / d0 / math.sin(a) + + return en + + +##################### +### angle +##################### + + +def angle(e, *hkl): + import math + + # possible keywords: # dictionary in python + # DEG = deg + # LN = ln + + i = 0 + count = 0 + h = [1, 1, 1] + iln = 0 + ideg = 0 + while i < 3 and count < len(hkl): + for var in hkl: + count = count + 1 + # print(var) + if type(var) == int: + print("var int ", var, i) + h[i] = var + i = i + 1 + + if "ln" in hkl: + iln = 1 + if "deg" in hkl: + ideg = 1 + + d0 = ( + 2 + * 5.43102 + * (1 - 2.4e-4 * iln) + / np.sqrt(h[0] ** 2.0 + h[1] ** 2.0 + h[2] ** 2.0) + ) + a = math.asin(12.39842 / d0 / e) + + if ideg: + a = math.degrees(a) # *180./math.pi + + return a + + +##################### +### setenergy +##################### +##################### + + +def sete(e, rock=1): + + bragg_ang = angle(e, "ln") * 1000.0 + umv(dev.dcm_bragg, bragg_ang) + setu19(e, detune=1) + + if rock: + rock() + + return + + +##################### +### getenergy +##################### + + +def getenergy(): + + ang_bragg_rb = dev.dcm_bragg.read()["dcm_bragg"]["value"] + # dev.dcm_bragg.user_readback.get() + + ang_bragg = ang_bragg_rb / 1000.0 # mrad to rad + energy = a2e(ang_bragg, "ln") + + return round(energy, 5) + + +##################### +### ROCK +##################### + + +def rock(**kwargs): + from lmfit.models import LinearModel, GaussianModel # LorentzianModel + import matplotlib.pyplot as plt + + dock_area = bec.gui.new() + wr = dock_area.new().new(bec.gui.available_widgets.Waveform) + + # width of rocking curve of perfect xtal: at 20 keV: 14 urad == 0.0008 deg + + # keywords: + # silent (no terminal output) + # ffail (success? repeat ?) + # time + # dth - change range of rocking curve (e.g.,if far off) + + ### axis [0/1] 0: Bragg ; 1: pitch + if "axis" in list(kwargs): + ax = kwargs["axis"] + print("Rocked MonoAxis is dcm_bragg") + else: + ax = 0 + # print('Rocked MonoAxis is Th1') + print("Rocked MonoAxis is dcm_pitch") + + ### waiting time + if "time" in list(kwargs): + time = kwargs["time"] + else: + time = 0.3 + print("Time is default of 0.3 s") + print("Time is %s s" % time) + + e = getenergy() + + ### width of rocking curve + if "dtheta" in list(kwargs): + dx = kwargs["dtheta"] + else: + # dx = 0.05/e ### width: 0.1/6 to 0.1/30 ~~ 0.017 to 0.0033 ###### in deg + dx = 0.1 # in mrad## width: 0.1/6 to 0.1/30 ~~ 0.017 to 0.0033 + dx = 0.6 / e * 2 + print("Rocking width is %s mrad" % dx) + + det = dev.lu_bpmsum + mot = dev.dcm_pitch + motb = dev.dcm_bragg + + mot_rb = mot.user_readback.get() + motb_rb = motb.user_readback.get() + # or pitch0 = dev.dcm_pitch.read()['dcm_pitch']['value'] + pos0 = mot_rb + + print("Position of bragg angle is %s mrad" % motb_rb) + print("Position of bragg pitch is %s mrad" % mot_rb) + + if ax == 1: + mot = motb + pos0 = motb_rb + + s = scans.line_scan(mot, -dx, dx, steps=50, exp_time=time, relative=True) + # md = scan.metadata["bec"] + wr.title = f"RockingScan at Energy of {e}" + wr.plot(x_name=mot.name, y_name=det.name) ##set names/axes first ! + wr.x_label = mot.name + wr.y_label = det.name + if ax == 0: + data_x = s.scan.live_data.dcm_pitch.dcm_pitch.val + else: + data_x = s.scan.live_data.dcm_bragg.dcm_bragg.val + data_y = s.scan.live_data.lu_bpmsum.lu_bpmsum.val + + wr.plot(data_x, data_y) + + # fit Gaussian + lin BG + peak = GaussianModel() + background = LinearModel() + model = peak + background + maxy = max(data_y) + indmax = np.argmax(data_y) + xm = data_x[indmax] + + p = model.make_params(amplitude=maxy, center=xm) + g = model.fit(data_y, p, x=data_x) + print(f'Center: {g.params["center"].value:.5f} FWHM: {g.params["fwhm"].value:.5f}') + + plt.ion() + plt.figure(1) + plt.plot(data_x, data_y, "*") + plt.plot(data_x, g.best_fit) + plt.show() + + res_center = g.params["center"].value + + diff_movement = abs(res_center - pos0) + if diff_movement < 0.25: + umv(mot, res_center) + else: + print("Fit failed, no movement") + + return + + +################################ +### FITTING PROGRAMS + +############################# +### fit and plot of history +############################# + + +def justfit(data_x, data_y, model="gauss", ibg=0): + """ + just fit a fct of Gauss, Voigt or Lorentzian type, + Examples: + gfit, xmax = justfit(data_x, data_y, model = "gauss", ibg =1) : Gauss, lin BG + gfit, xmax = justfit(data_x, data_y, model = "voigt", ibg =2) : Voigt, quadrat BG + gfit, xmax = justfit(data_x, data_y, model = "lorentz", ibg =0) : Lorentzian, no BG + """ + + from lmfit.models import ( + LinearModel, + GaussianModel, + VoigtModel, + QuadraticModel, + LorentzianModel, + ) + import matplotlib.pyplot as plt + + peak = GaussianModel() + if model == "voigt": + peak = VoigtModel() + if model == "lorentz": + peak = LorentzianModel() + model = peak + + if ibg: + # add a BG + background = LinearModel() + if ibg > 1: + background = QuadraticModel() + model = peak + background + + ### define data_x and data_y: + maxy = max(data_y) + indmax = np.argmax(data_y) + xm = data_x[indmax] + sigma = 1.0 + gamma = 0.2 # blurring/widening of the sigma ; the larger, the more of a Lorentzian profile + + ## possible, but not necessarily better ... : + # sl = 0.00001 * (data_y[-2]-data_y[2])/(data_x[-2]-data_x[2]) # small slope + # ic = min(data_y) + # print(f'slope and intercept initial conds= {sl, ic}') + ## + print("maxy, indmax, xm = ", maxy, indmax, xm) + + p = model.make_params(amplitude=maxy, center=xm) + # other possibilities ... : + # p = model.make_params(amplitude = maxy, center=xm , slope=0, intercept = ic) + # p = model.make_params(amplitude = max(data_y), center=xm, slope=0, intercept = min(data_y)) + + p["center"].set(min=min(data_x), max=max(data_x)) + p["sigma"].set(min=0, max=(max(data_x) - min(data_x)) / 2.0) + # other possibilities ... : + # p['amplitude'].set(min=min(data_y), max=max(data_y)) + # p['intercept'].set(max=(max(data_y)-min(data_y))/2.) + + g = model.fit(data_y, p, x=data_x) + + # diagnostics + # print(f'Gfit: {g.params}') + + print(f'Center of {model} fit: {g.params["center"].value:.5f}') + print(f'Sigma: {g.params["sigma"].value:.5f}') + print(f'FWHM: {g.params["fwhm"].value:.5f}') + print(f"Position of maximum: {xm}") + + return g, xm + + +def fit_plothist(hindex: int, signal_name: str, model="gauss", ibg=0): + """ + Get data for a completed scan from the BEC history, (gaussian, voigt, lorentzian + zero, lin, quadrat BG) fit, + and plot all the results. + + Args: + history_index (int): scan to fetch, e.g. -1 for the most recent scan + signal_name (str): the monitor + + Example: + gcen, xm , data_x, data_y = fit_plothist(-10, "lu_bpmsum",model="gauss", ibg=2) + + """ + + from lmfit.models import ( + LinearModel, + GaussianModel, + VoigtModel, + QuadraticModel, + LorentzianModel, + ) + import matplotlib.pyplot as plt + + h = bec.history[hindex] + md = h.metadata["bec"] + scanvar = list(md["args"].keys())[ + 0 + ] # string, returns the variable of the last performed scan + # data = h.devices[device_name][signal].read()["value"] + # data_x = h.devices.dcm_pitch.dcm_pitch.read()["value"] + # data_y = h.devices.lu_bpmsum.lu_bpmsum.read()["value"] + # data_x = h.devices[device_name][device_name].read()["value"] + + data_xd = md["positions"] # last scan knows which device ... however, double array + data_x = data_xd.flatten() + print("dx = ", data_x) + data_y = h.devices[signal_name][signal_name].read()["value"] + print("dy = ", data_y) + + g, xm = justfit(data_x, data_y, model=model, ibg=ibg) + + plt.ion() + plt.figure() + plt.plot(data_x, data_y, "*") + plt.plot(data_x, g.best_fit) + plt.xlabel(scanvar) + plt.ylabel(signal_name) + plt.show() + + gcen = g.params["center"].value + + return gcen, xm, data_x, data_y + + +######################### +### fit and plot ONLY +######################### + + +def fit_plot(data_x, data_y, model="gauss", ibg=1, fitrange=0, fitclick=0): + """ + Get data for a completed scan, gaussian, voigt or lorentzian fit, + default = gauss, plus linear background + and plot all the results; + possibly select the range before [fitrange = 1], you will be asked to enter the x-range + OR choose interactively [[fitclick = 1], you will be asked to click + Args: + datax , datay + Example: + fit_plot(dx,dy, model='voigt', fitrange=0, fitclick=1) + + """ + + from lmfit.models import LinearModel, GaussianModel, VoigtModel, LorentzianModel + import matplotlib.pyplot as plt + import numpy as np + + print("dy = ", data_y) + + ####### check the order of the data: lower --> upper ? if not, reverse all the indices! + sz = len(data_x) + if data_x[sz - 1] < data_x[0]: + # reverse indices + data_x = data_x[::-1] + data_y = data_y[::-1] + + if fitrange: + x1, x2 = input("Please enter the x-range for the fit as x1, x2 ").split(",") + x1 = float(x1) + x2 = float(x2) + closest_low = np.abs(np.array(data_x) - x1).argmin() + closest_high = np.abs(np.array(data_x) - x2).argmin() + print("low_ind", closest_low) + print("high_ind", closest_high) + + data_x = data_x[closest_low:closest_high] + data_y = data_y[closest_low:closest_high] + print("dx=", data_x) + print("dy=", data_y) + + ##### + peak = GaussianModel() + if model == "voigt": + peak = VoigtModel() + if model == "lorentz": + peak = LorentzianModel() + + if ibg: + background = LinearModel() + model = peak + background + + ### define data_x and data_y: + maxy = max(data_y) + indmax = np.argmax(data_y) + xm = data_x[indmax] + sigma = 1.0 + gamma = 0.2 # blurring/widening of the sigma ; the larger, the more of a Lorentzian profile + print("maxy, indmax, xm = ", maxy, indmax, xm) + #p = model.make_params( + # amplitude=max(data_y), center=xm, slope=0, intercept=min(data_y) + #) + p = model.make_params(amplitude=maxy, center=xm) + p["center"].set(min=min(data_x), max=max(data_x)) + p["sigma"].set(min=0, max=(max(data_x) - min(data_x)) / 2.0) + + g = model.fit(data_y, p, x=data_x) + + # diagnostics + # print(f'Gfit: {g.params}') + + print(f'Center of {model} fit: {g.params["center"].value:.5f}') + print(f'Sigma: {g.params["sigma"].value:.5f}') + print(f'FWHM: {g.params["fwhm"].value:.5f}') + print(f"Position of maximum: {xm}") + + ##### + + plt.ion() + plt.figure() + plt.plot(data_x, data_y, "*") + plt.plot(data_x, g.best_fit) + plt.xlabel("x-axis") + plt.ylabel("signal") + plt.show() + + if fitclick: + print("Please define the x-range of the fit via click of graph") + print("First click for start: ") + x1 = plt.ginput(1) + + print("Second click for end: ") + x2 = plt.ginput(1) + + print("x1 = ", x1) + print("x2 = ", x2) + + x1 = float(x1[0][0]) ## first element of the tupel in the list + x2 = float(x2[0][0]) + print("x1,x2 = ", x1, x2) + closest_low = np.abs(data_x - x1).argmin() + closest_high = np.abs(data_x - x2).argmin() + print("low_ind", closest_low) + print("high_ind", closest_high) + + data_x = data_x[closest_low:closest_high] + data_y = data_y[closest_low:closest_high] + print("dx=", data_x) + print("dy=", data_y) + + g, xm = justfit(data_x, data_y, model=model, ibg=ibg) + + plt.figure() + plt.plot(data_x, data_y, "*") + plt.plot(data_x, g.best_fit) + + plt.xlabel("x-axis") + plt.ylabel("signal") + plt.show() + + gcen = g.params["center"].value + + return gcen, xm + + +#################################### +### just save data +#################################### +def save_data(hindex: int, device_name: str, signal_name: str): + """ + Get data for a completed scan from the BEC history, + and plot all the results, store in a CSV file + + Args: + history_index (int): scan to fetch, e.g. -1 for the most recent scan + device_name (str): the device for which to get the monitoring data + signal_name (str): the monitor + """ + + h = bec.history[hindex] + md = h.metadata["bec"] + + print("Scan argument was ", md["args"]) + data_x = h.devices[device_name][device_name].read()["value"] + data_y = h.devices[signal_name][signal_name].read()["value"] + + plt.ion() + plt.figure() + plt.plot(data_x, data_y, ".") + + plt.title(f"Scan of {device_name}") + plt.xlabel(f"{device_name}") + plt.ylabel(f"{signal_name} / AU") + + plt.show() + ans = "n" + ans = input("Store data in csv file? y/n ") + if ans == "y": + dirname = "/home/gac-x10sa/Data/" + # writing output to simple data file for later analysis: + combined = np.column_stack((data_x, data_y)) + filename = dirname + "Scan" + str(hindex) + device_name + ".txt" + with open(filename, "w") as f: + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + return data_x, data_y + + +######################################### +### just retried the saved data from csv +######################################## +def read_data(filename: str): + """ + Get data stored in a CSV file + + Args: + filename (str): the csv file, eg, of a Scan + """ + + # dirname = '/home/gac-x10sa/Data/' + # read output to simple data file for later analysis: + + with open(filename, "r") as f: + combined_data = np.loadtxt(f, delimiter=",") # no header and fmt for load + + rows, cols = combined_data.shape + + print("No of Rows =", rows) + print("No of Colums =", cols) + if cols > 2: + print( + "only first 2 colums (data[:, 0] and data[:, 1]) are plotted, please consider the other columns as well!" + ) + index1 = 0 + index2 = 1 + index1, index2 = input( + "please enter the columns to be plotted vs first one [eg: 1,2] : " + ).split(",") + ind1 = int(index1) + ind2 = int(index2) + + if ind2 > cols - 1: + print(" no valid index") + + data_x = combined_data[:, ind1] + data_y = combined_data[:, ind2] + + plt.ion() + plt.figure() + plt.plot(data_x, data_y, ".") + + plt.title(f"Scan from {filename}") + plt.xlabel(f"data column {ind1}") + plt.ylabel(f"data column {ind2}") + + plt.show() + + return combined_data + + +#################################### +### save and plot the long gapscans +#################################### + + +def save_plot_gaps(hindex: int, device_name: str, signal_name: str): + """ + Get data for a completed scan from the BEC history, + and plot all the results, store in a CSV file + + Args: + history_index (int): scan to fetch, e.g. -1 for the most recent scan + device_name (str): the device for which to get the monitoring data + signal_name (str): the monitor + """ + + from lmfit.models import LinearModel, GaussianModel + import matplotlib.pyplot as plt + + h = bec.history[hindex] + md = h.metadata["bec"] + # data = h.devices[device_name][signal].read()["value"] + # data_x = h.devices.dcm_pitch.dcm_pitch.read()["value"] + # data_y = h.devices.lu_bpmsum.lu_bpmsum.read()["value"] + data_x = h.devices[device_name][device_name].read()["value"] + data_x = data_x / 1000.0 + + d0 = 2 * 5.43102 * (1 - 2.4e-4 * 1) / np.sqrt(3.0) + en_vec = 12.39842 / d0 / np.sin(data_x) + + data_y = h.devices[signal_name][signal_name].read()["value"] + + plt.ion() + plt.figure() + # plt.plot(data_x,data_y,'*') + plt.plot(en_vec, data_y) + plt.title("Energy Scan") + plt.xlabel("E/keV") + plt.ylabel("LU BPM Sum Signal / AU") + + plt.show() + + dirname = "/home/gac-x10sa/Data/" + # writing output to simple data file for later analysis: + combined = np.column_stack((en_vec, data_y)) + filename = dirname + "EnScan" + str(hindex) + ".txt" + with open(filename, "w") as f: + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + return + + +##################### +### readmonitors() +##################### + + +def getdiodepos(diode="i1"): + """ + check if diode is in position + i1 = in KBox, below scinti + i2 = below Eiger + """ + + diode_in = 1 + dpos = 44 # mm + measdev = dev.scin_y + diodepos_rb = measdev.user_readback.get() + if abs(dpos - diodepos_rb) > 0.1: + print("Diode not in, please move") + diode_in = 0 # sys.exit(0) + + if diode == "i2": + detpos_diode = 214.5 + detpos_rb = dev.det_z.user_readback.get() + print("using Det diode requires det up and close and beamstop out") + if abs(detpos - detpos_rb) > 0.5: + print("Diode not in, please move Det up") + diode_in = 0 # sys.exit(0) + + return diode_in + + +def read_mon(): + from datetime import datetime + + e = getenergy() + fesum = dev.fe_bpmsum.read()["fe_bpmsum"]["value"] + lusum = dev.lu_bpmsum.read()["lu_bpmsum"]["value"] + bscsum = dev.bsc_bpmsum.read()["bsc_bpmsum"]["value"] + + # Mono + bragg = dev.dcm_bragg.read()["dcm_bragg"]["value"] + pitch = dev.dcm_pitch.read()["dcm_pitch"]["value"] + perp = dev.dcm_perp.read()["dcm_perp"]["value"] + fpitch = dev.dcm_fpitch.read()["dcm_fpitch"]["value"] + froll = dev.dcm_froll.read()["dcm_froll"]["value"] + + gap = dev.id_gap.read()["id_gap"]["value"] + + # KB V + vbu = dev.vfm_bu.read()["vfm_bu"]["value"] + vbd = dev.vfm_bd.read()["vfm_bd"]["value"] + vbpitch = dev.vfm_pitch.read()["vfm_pitch"]["value"] + vbyaw = dev.vfm_yaw.read()["vfm_yaw"]["value"] + vbroll = dev.vfm_roll.read()["vfm_roll"]["value"] + vblat = dev.vfm_lat.read()["vfm_lat"]["value"] + vbvert = dev.vfm_vert.read()["vfm_vert"]["value"] + # KB H + hbu = dev.hfm_bu.read()["hfm_bu"]["value"] + hbd = dev.hfm_bd.read()["hfm_bd"]["value"] + hbpitch = dev.hfm_pitch.read()["hfm_pitch"]["value"] + hbyaw = dev.hfm_yaw.read()["hfm_yaw"]["value"] + hbroll = dev.hfm_roll.read()["hfm_roll"]["value"] + hblat = dev.hfm_lat.read()["hfm_lat"]["value"] + hbvert = dev.hfm_vert.read()["hfm_vert"]["value"] + + ## FE slits centre and size + fe_sx_cen = dev.fe_sxcen.read()["fe_sxcen"]["value"] + fe_sx_size = dev.fe_sxsize.read()["fe_sxsize"]["value"] + fe_sy_cen = dev.fe_sycen.read()["fe_sycen"]["value"] + fe_sy_size = dev.fe_sysize.read()["fe_sysize"]["value"] + + ## BSF slits centre and size + s1_xcen = dev.s1_xcen.read()["s1_xcen"]["value"] + s1_xsize = dev.s1_xsize.read()["s1_xsize"]["value"] + s1_ycen = dev.s1_ycen.read()["s1_ycen"]["value"] + s1_ysize = dev.s1_ysize.read()["s1_ysize"]["value"] + + ## BSC slits centre and size + s2_xcen = dev.s2_xcen.read()["s2_xcen"]["value"] + s2_xsize = dev.s2_xsize.read()["s2_xsize"]["value"] + s2_ycen = dev.s2_ycen.read()["s2_ycen"]["value"] + s2_ysize = dev.s2_ysize.read()["s2_ysize"]["value"] + + ## BCU slits centre and size + s3_xcen = dev.s3_xcen.read()["s3_xcen"]["value"] + s3_xsize = dev.s3_xsize.read()["s3_xsize"]["value"] + s3_ycen = dev.s3_ycen.read()["s3_ycen"]["value"] + s3_ysize = dev.s3_ysize.read()["s3_ysize"]["value"] + + ## move in screen in BSC chamber and get size and position + ## move out again + + # umv(dev.samcam_xmot, 1) + ## move in screen at sample position and get size and position + + # move up diode and get reading + + ## move out again + + ## move in x-ray eye below detector and get size and position + + # read out DetDiode and calc flux + + ## move out again + + xeye_x_pos = dev.xeye_x.read()["xeye_x"]["value"] + bcusum = 0 + i1signal = 0 + if xeye_x_pos < 0: + bcusum = dev.bcu_bpmsum.read()["bcu_bpmsum"]["value"] + i1signal = dev.i1.read()["i1"]["value"] + print("Energy = ", e, " keV") + print(f"fesum,lusum,bscsum,bcusum,i1signal = {fesum,lusum,bscsum,bcusum,i1signal}") + + print( + f"bragg, pitch, perp, fpitch, froll, gap = {bragg, pitch,perp, fpitch, froll, gap}" + ) + # return e, fesum,lusum,bscsum,bcusum,i1signal + print("KB VERT") + print( + f"vbu, vbd, vbpitch,vbyaw,vbroll,vblat,vbvert = {vbu, vbd, vbpitch,vbyaw,vbroll,vblat,vbvert}" + ) + # return vbu, vbd, vbpitch,vbyaw,vbroll,vblat,vbvert + print("KB HORI") + print( + f"hbu, hbd, hbpitch,hbyaw,hbroll,hblat,hbvert = {hbu, hbd, hbpitch,hbyaw,hbroll,hblat,hbvert}" + ) + # return hbu, hbd, hbpitch,hbyaw,hbroll,hblat,hbvert + ## dump status in CSV + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + dirname = "/home/gac-x10sa/Data/" + filename = dirname + f"BLstatus_{timestamp}.txt" + with open(filename, "w") as f: + combined = np.column_stack((e, gap)) + f.write("En and gap\n") + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("bragg, pitch,perp, fpitch, froll\n") + combined = np.column_stack((bragg, pitch, perp, fpitch, froll)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("vbu, vbd, vbpitch,vbyaw,vbroll,vblat,vbvert\n") + combined = np.column_stack((vbu, vbd, vbpitch, vbyaw, vbroll, vblat, vbvert)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("hbu, hbd, hbpitch,hbyaw,hbroll,hblat,hbvert\n") + combined = np.column_stack((hbu, hbd, hbpitch, hbyaw, hbroll, hblat, hbvert)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("FE slits\n") + combined = np.column_stack((fe_sx_cen, fe_sx_size, fe_sy_cen, fe_sy_size)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("BSF slits\n") + combined = np.column_stack((s1_xcen, s1_xsize, s1_ycen, s1_ysize)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("BSC slits\n") + combined = np.column_stack((s2_xcen, s2_xsize, s2_ycen, s2_ysize)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("BCU slits\n") + combined = np.column_stack((s3_xcen, s3_xsize, s3_ycen, s3_ysize)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + f.write("fesum,lusum,bscsum,bcusum,i1signal\n") + combined = np.column_stack((fesum, lusum, bscsum, bcusum, i1signal)) + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + return + + +##################### +###long bragg scan +##################### + + +def longscan(): + # scan the entire energy range from 5 keV (bragg angle = 406.512 mrad) to 30 keV (bragg angle = 65.949 mrad) + # "snakewise" + import time + + umv(dev.id_gap, 4.5) + time.sleep(0.2) + s = scans.line_scan( + dev.dcm_bragg, 406.5, 65.9, steps=1500, exp_time=0.05, relative=False + ) + time.sleep(2) + umv(dev.id_gap, 5.0) + time.sleep(0.2) + s = scans.line_scan( + dev.dcm_bragg, 65.9, 406.5, steps=1500, exp_time=0.05, relative=False + ) + time.sleep(2) + umv(dev.id_gap, 5.5) + time.sleep(0.2) + s = scans.line_scan( + dev.dcm_bragg, 406.5, 65.9, steps=1500, exp_time=0.05, relative=False + ) + time.sleep(2) + umv(dev.id_gap, 6.0) + time.sleep(0.2) + s = scans.line_scan( + dev.dcm_bragg, 65.9, 406.5, steps=1500, exp_time=0.05, relative=False + ) + time.sleep(2) + umv(dev.id_gap, 6.5) + time.sleep(0.2) + s = scans.line_scan( + dev.dcm_bragg, 406.5, 65.9, steps=1500, exp_time=0.05, relative=False + ) + + +##################### +### colliscans +##################### +def colliscan(direction: str, range=0.3, nsteps=30, stime=0.5, centre=1): + """ + Colli scan, and possible centre + and plot all the results. + + Args: + direction (str): the direction [x/y] + range: default = 0.3 + nsteps: default = 30 + settling time: default = 0.5 + move to centre or not [0/1] + + Example: colliscan("h", centre=1) + + """ + import sys + + dock_area = bec.gui.new() + wr = dock_area.new().new(bec.gui.available_widgets.Waveform) + + # check if i1 DIODE is IN + # if not, aks to be moved + + diodeinpos = 44 # mm + colli_up = 41 # mm + + measdev = dev.scin_y + diodepos_rb = measdev.user_readback.get() + if abs(diodeinpos - diodepos_rb) > 0.1: + print("Diode not in, please move") + sys.exit(0) + + signal_name = "i1" + + dx = range # scan range + + if direction in ("x", "h"): + mot = dev.coll_x + + else: + mot = dev.coll_y + # mot_name = "coll_y" + + motname = mot.name + + # note current motor positions + curr_rbx = dev.coll_x.user_readback.get() + curr_rby = dev.coll_y.user_readback.get() + curr_rb = mot.user_readback.get() + print(f"Motor position before scan: {curr_rb}") + if abs(colli_up - curr_rby) > 1.5: + print(f"Colli vert pos = {curr_rb}, in-pos = {colli_up}") + print("Colli not up, please move") + sys.exit(0) + + s = scans.line_scan(mot, -dx, dx, steps=nsteps, settling_time=stime, relative=True) + + h = s.scan.data # this is a ScanDataContainer and can get the MetaData + # md = h.metadata["bec"] + data_x = h.devices[motname][motname].read()["value"] + # data_y = h.devices[signal_name][signal_name].read()["value"] # does not work for signal + # data_y = h.devices.signal_name.signal_name.read() # timestamp AND value , dir + data_y = h.devices.i1.i1.read()["value"] + + cen, peak = fit_plot(data_x, data_y) + + ans = "n" + if abs(cen - curr_rb) > dx: + print("Fit failed, not moving") + ans = input("Going to peak value? y/n ") + if ans == "y": + umv(mot, peak) + else: + if centre: + print(f"Moving to new colli position of {cen}") + umv(mot, cen) + else: + print(f"Scan finished, going back to previous position") + umv(mot, curr_rb) + return + + +##################### +### slitscans +##################### +def slitscan(device_location: str, direction: str, range: 1, nsteps=50, centre=0): + """ + Slit scan, and possible centre + and plot all the results. + + Args: + device_location (str): the slits for which to get the monitoring data + direction (str): the direction [h/v] + nsteps: default = 50 + move to centre or not [0/1] + + Example: slitscan("fe","h", 30, centre=1) + + """ + + from lmfit.models import LinearModel, GaussianModel + import matplotlib.pyplot as plt + + time = 0.1 + dx = range # scan range + + # FE slits ================================ + if device_location in ["fe"]: + default_h = 1.0 ## 1.4 or 0.4 or ??? + default_v = 1.0 ## 1.2 + det = dev.lu_bpmsum + if direction == "h": + mot = dev.fe_sxcen + size = dev.fe_sxsize + s_closed = 0.1 + s_open = default_h + else: + mot = dev.fe_sycen + size = dev.fe_sysize + s_closed = 0.1 + s_open = default_v + + # BSF slits ================================ + if device_location in ["bsf", "s1"]: + default_h = 3.0 # 8125 + default_v = 3.0 # ??? close more ???? # 8149.8 + det = dev.lu_bpmsum + # det = dev.bsc_bpmsum or #det = dev.bcu_bpmsum would also work + if direction == "h": + mot = dev.s1_xcen + size = dev.s1_xsize + s_closed = 0.1 + s_open = default_h + else: + mot = dev.s1_ycen + size = dev.s1_ysize + s_closed = 0.1 + s_open = default_v + + # BSC slits ================================ + + if device_location in ["bsc", "s2", "ss"]: + default_h = 6.0 # ??? + default_v = 5.0 # ??? close more ???? # 8149.8 + det = dev.bcu_bpmsum + if direction == "h": + mot = dev.s2_xcen + size = dev.s2_xsize + s_closed = 0.1 + s_open = default_h + else: + mot = dev.s2_ycen + size = dev.s2_ysize + s_closed = 0.1 + s_open = default_v + + # BCU slits ================================ + + if device_location in ["bcu", "s3", "eb"]: + default_h = 2.0 # ??? + default_v = 2.0 # ??? close more ???? # 8149.8 + # change to i0 later ?? + det = dev.i1 + dposm = 43.8 + # dpos0 = dev.scin_y.user_readback.get() + # if abs(dposm - dpos0) > 1: + # print("moving diode i1 in") + # umv(dev.scin_y, dposm) + + if direction == "h": + mot = dev.s3_xcen + size = dev.s3_xsize + s_closed = 3.0 ## very large, else does not work ! + s_open = default_h + else: + mot = dev.s3_ycen + size = dev.s3_ysize + s_closed = 3.0 ## very large ! + s_open = default_v + + # ===== ================================ + + print( + f"scanning parameters are: {device_location}, direction= {direction}, range={-range, range}, steps={nsteps}, center it? {centre}" + ) + ans = input("All good, start the scan? (y/n): ").strip().lower() + + if ans not in ("yes", "y"): + print("Ok, cancelled") + return + + dock_area = bec.gui.new() + wr = dock_area.new().new(bec.gui.available_widgets.Waveform) + + pos0 = mot.user_readback.get() + siz0 = size.user_readback.get() + umv(size, s_closed) + + s = scans.line_scan(mot, -dx, dx, steps=nsteps, exp_time=time, relative=True) + wr.plot(x_name=mot.name, y_name=det.name) + wr.x_label = mot.name + wr.y_label = det.name + + # this gives funny errors: ARRAYS differently read? + # data_x = s.scan.data.devices[mot.name][mot.name].read()["value"] + # data_x = s.scan.live_data["fe_sxcen"]["fe_sxcen"].val + # data_y = s.scan.data.devices[det.name][det.name].read()["value"] + + # FE slits ================================ + data_y = s.scan.live_data.lu_bpmsum.lu_bpmsum.val + + if mot.name == "fe_sxcen": + data_x = s.scan.live_data.fe_sxcen.fe_sxcen.val + if mot.name == "fe_sycen": + data_x = s.scan.live_data.fe_sycen.fe_sycen.val + # BSF slits ================================ + if mot.name == "s1_xcen": + data_x = s.scan.live_data.s1_xcen.s1_xcen.val + if mot.name == "s1_ycen": + data_x = s.scan.live_data.s1_ycen.s1_ycen.val + + # BSC slits ================================ + if mot.name == "s2_xcen": + data_x = s.scan.live_data.s2_xcen.s2_xcen.val + data_y = s.scan.live_data.bcu_bpmsum.bcu_bpmsum.val + if mot.name == "s2_ycen": + data_x = s.scan.live_data.s2_ycen.s2_ycen.val + data_y = s.scan.live_data.bcu_bpmsum.bcu_bpmsum.val + + # BCU slits ================================ + if mot.name == "s3_xcen": + data_x = s.scan.live_data.s3_xcen.s3_xcen.val + data_y = s.scan.live_data.i1.i1.val + if mot.name == "s3_ycen": + data_x = s.scan.live_data.s3_ycen.s3_ycen.val + data_y = s.scan.live_data.i1.i1.val + + # change to i0 later ?? + + wr.plot(data_x, data_y) + # fit Gaussian + lin BG + peak = GaussianModel() + background = LinearModel() + model = peak + background + maxy = max(data_y) + indmax = np.argmax(data_y) + xm = data_x[indmax] + + p = model.make_params(amplitude=maxy, center=xm) + g = model.fit(data_y, p, x=data_x) + print(f'Center: {g.params["center"].value:.5f} FWHM: {g.params["fwhm"].value:.5f}') + + plt.ion() + plt.figure(2) + plt.plot(data_x, data_y, "*") + plt.plot(data_x, g.best_fit) + plt.show() + + res_center = g.params["center"].value + if centre and abs(res_center) < 1.5: + print(f"moving slit centre from {pos0} to {res_center}") + umv(mot, res_center) + else: + print("not changing the slit position") + + print(f"opening slit to {s_open}") + umv(size, s_open) + return + + +##################### +### KB focusing +##################### +## note acceptance of KB; +## length = 400 mm +## about 3 mrad pitch --> roughly 1.5 mm in both dir +## + + +def kbfocus(sizex, sizey): + # + # note: bending is in neg direction, and the KB mirrors have a hysteresis + # better to bend from ONE direction only, always unbend/bend for more reproducible results + # + import math + import time + + en = getenergy() + print(f"Energy is {en} keV") + print( + "Currently only 2 sizes supported, small approx.(2 x 2.7) and medium approx.(40 x 40)" + ) + + ## is the pitch ok ? + vpitch = 2.695 + hpitch = 2.997 + + accepted_tol = 5e-3 + + vpitch_rb = dev.vfm_pitch.user_readback.get() + hpitch_rb = dev.hfm_pitch.user_readback.get() + if not math.isclose(vpitch_rb, vpitch, rel_tol=accepted_tol): + print("warning! vertical pitch may be off") + if not math.isclose(hpitch_rb, hpitch, rel_tol=accepted_tol): + print("warning! horizontal pitch may be off") + + ##### one good size: + #### careful, the scinti position ! + + vfbend_u = -0.8212 + vfbend_d = -1.3250 + + hfbend_u = -2.7270 + hfbend_d = -5.3480 + + # if sizey < 5: + # vfbend_u = -0.9895 # -1.029 + # vfbend_d = -1.4050 # -1.545 + # else: + # vfbend_u = -0.9793 + # vfbend_d = -1.4950 + # + # if sizex < 5: + # hfbend_u = -2.887 + # hfbend_d = -5.550 + # else: + # hfbend_u = -2.7378 + # hfbend_d = -5.4000 + + umv(dev.vfm_bu, vfbend_u + 0.01) + umv(dev.vfm_bd, vfbend_d + 0.01) + time.sleep(0.1) + umv(dev.vfm_bu, vfbend_u) + umv(dev.vfm_bd, vfbend_d) + + umv(dev.hfm_bu, hfbend_u + 0.01) + umv(dev.hfm_bd, hfbend_d + 0.01) + time.sleep(0.1) + umv(dev.hfm_bu, hfbend_u) + umv(dev.hfm_bd, hfbend_d) + + return + + +##### ====================== +##### goto several energies with about the right gap opening + + +def goto7(): + perp_dist = 0.0389 + umv(dev.dcm_perp, perp_dist) + gap = 6.9 # 3rd harm + umv(dev.id_gap, gap) + sete(7) + + +def goto20(): + perp_dist = 0.0389 + umv(dev.dcm_perp, perp_dist) + gap = 4.736 # 13th harm + umv(dev.id_gap, gap) + sete(20) + + +def goto27(): + perp_dist = 0.0389 + umv(dev.dcm_perp, perp_dist) + gap = 4.86 # 17th harm + umv(dev.id_gap, gap) + sete(27) + + +def goto30(): + perp_dist = 0.0389 + umv(dev.dcm_perp, perp_dist) + gap = 4.842 # 19th harm + umv(dev.id_gap, gap) + sete(30) + + +######################### +### Status of the BEC +######################### + + +def bstatus(): + # + dock_area = bec.gui.new() + + dbrowser = dock_area.new("device_browser").new( + bec.gui.available_widgets.DeviceBrowser + ) + + dock_area.new("queue").new(bec.gui.available_widgets.BECQueue) + # queue = dock_area.queue.BECQueue # give it a name + # text_box = dock_area.new().new(widget=bec.gui.available_widgets.TextBox) + # text_box.set_plain_text("Hello, World!") + # text_box.set_html_text("

Welcome to BEC Widgets

This is an example of displaying HTML text.

") + + +########################### +### move in the X-ray Eye +########################### + + +def detxeye_in(): + """ + moves in the Xray eye and the Diode below detector + """ + # + setfoc = 1.0 + setzoom = 0.1 + setxrpos = 2.0 + + setdetzpos = 260 # 200 better? + setdetypos = 214.5 + + detzpos = dev.det_z.user_readback.get() + detypos = dev.det_y.user_readback.get() + + print(f"detector position now: z = {detzpos}, y = {detypos}") + + ans = "n" + ans = input("Is it ok to move detector in and up? y/n ") + if ans == "y": + print("moving the detector up (only while detz GT 300 mm) and in") + if detzpos > 300: + umv(dev.det_y, setdetypos) + umv(dev.det_z, setdetzpos) + else: + print("ok, doing nothing") + return + + print("moving X-ray eye below Eiger in") + + xrpos = dev.xeye2_x.user_readback.get() + if abs(setxrpos - xrpos) > 1: + umv(dev.xeye2_x, setxrpos) + zoompos = dev.xeye2_zoom.user_readback.get() + if abs(setzoom - zoompos) > 1: + umv(dev.xeye2_zoom, setzoom) + focpos = dev.xeye2_focus.user_readback.get() + if abs(setfoc - focpos) > 1: + umv(dev.xeye2_focus, setfoc) + + +def detxeye_out(): + """ + moves det back and down + """ + setdetzpos = 500 + setdetypos = 0 + umv(dev.det_z, setdetzpos) + detzpos = dev.det_z.user_readback.get() + if detzpos > 300: + umv(dev.det_y, setdetypos) + + +########################### +### beamsize on scinti +########################### + + +def measure_samcam(zoom=1000): + scinti_inpos = 38.6 # mm + sc_rb = dev.scin_y.user_readback.get() + if abs(scinti_inpos - sc_rb) > 0.3: + print("Scinti not in, please move") + sys.exit(0) + + auto_exposure(cam="samcam", target=200) + a = dev.samcam_xsig.read()["samcam_xsig"]["value"] + b = dev.samcam_ysig.read()["samcam_ysig"]["value"] + sx = calc_scam_microns(a, zoom=zoom) * 2.35 + sy = calc_scam_microns(b, zoom=zoom) * 2.35 + print(f"FWHM in um : {sx, sy}") + + return sx, sy + + +def measure_bsccam(): + x_inpos = 7 # mm + px2mum = 20 + scpos_rb = dev.xeye_x.user_readback.get() + if abs(x_inpos - scpos_rb) > 0.3: + print("Scinti not in, please move") + sys.exit(0) + + auto_exposure(cam="bsccam", target=200) + a = dev.bsccam_xsig.read()["bsccam_xsig"]["value"] + b = dev.bsccam_ysig.read()["bsccam_ysig"]["value"] + sx = a * px2mum * 2.35 + sy = b * px2mum * 2.35 + print(f"FWHM at BSC cam in um : {sx, sy}") + + return sx, sy + + +######### special purpose programs ############### +################################################## + + +# EXAMPLE: knife_scan.py +def knife_edge(dir="hor", range=0.05, steps=100): + + mot = dev.gon_x + if dir == "vert": + mot = dev.gon_y + + s = scans.line_scan( + dev.gon_x, -range, range, steps=steps, exp_time=1, relative=True + ) + return + + +############## +def smoothplotgrad(dx, dy, reverse=0): + """ + for example from the knife edge scans + if reverse == 1 then mirrored for plot and fit + USE as: smoothplotgrad(dx,dy, reverse = 0) + """ + + from scipy.ndimage import gaussian_filter1d + import matplotlib.pyplot as plt + + gf = gaussian_filter1d(dy, 1) + + plt.ion() + plt.figure() + plt.plot(dx, dy, "*") + plt.plot(dx, gf) + plt.figure() + + grf = np.gradient(gf) + if reverse: + grf = -grf + plt.plot(dx, grf) + g, xm = justfit(dx, grf, model="gauss", ibg=0) + plt.plot(dx, g.best_fit) + + return + + +################################################################################## +### enery scan program around a harmonic + + +def scan_eg(erange, nsteps=50, fit=True): + """ + Scan an energy range around a certain harmonic + scan_eg(erange, steps= 50, fit= True) # default + scan_eg(erange, steps= 50, fit= False) # no fit wanted + eg, erange = 12.4 *0.02 for a width of 2 % around 12.4 keV + """ + import time + from datetime import datetime + + from lmfit.models import LinearModel, GaussianModel, VoigtModel + import matplotlib.pyplot as plt + + ## defaults + exptime = 1 + + e = getenergy() + print(f"Current energy is: {e} keV") + e_start = e - erange + e_end = e + erange + + en_resol = (e_start - e_end) / nsteps + print(f"energy resolution is: {en_resol} keV") + + mot_scan = dev.dcm_bragg + mot_pitch = dev.dcm_pitch + mot_perp = dev.dcm_perp + + print( + f"scanning parameters are: motor = {mot_scan.name}, range={e-erange, e+erange}, steps={nsteps}" + ) + ans = input("All good, start the scan? (y/n): ").strip().lower() + + if ans not in ("yes", "y"): + print("Ok, cancelled") + return + + mot_scan_rb = mot_scan.user_readback.get() + mot_pitch_rb = mot_pitch.user_readback.get() + mot_perp_rb = mot_perp.user_readback.get() + + det = dev.lu_bpmsum + + print(f"Position of bragg angle is {mot_scan_rb} mrad") + print(f"Position of pitch angle is {mot_pitch_rb} mrad") + print(f"Perp distance is {mot_perp_rb} mm") + + a_start = angle(e_start, "ln") * 1000 + a_end = angle(e_end, "ln") * 1000.0 + + print(f"Scanning Bragg from {a_start} to {a_end} mrad") + + s = scans.line_scan( + mot_scan, a_start, a_end, steps=nsteps, exp_time=exptime, relative=False + ) + + ## plot and fit the scan + bragg_data = ( + s.scan.live_data.dcm_bragg.dcm_bragg.val + ) # is a list and needs to be converted to array + bpm_data = s.scan.live_data.lu_bpmsum.lu_bpmsum.val + + if fit: + print("Bragg Angle") + bcen, xm = fit_plot(bragg_data, bpm_data, model="voigt", fitrange=1) + print("") + + br_rad = np.array(bragg_data) / 1000.0 + d0 = 2 * 5.43102 * (1 - 2.4e-4 * 1) / np.sqrt(3.0) + energy_data = 12.39842 / d0 / np.sin(br_rad) + + print("Energy") + ecen, xm = fit_plot(energy_data, bpm_data, model="voigt", fitclick=1) + + ## save data to file ? + + ans = "n" + ans = input("Store data in csv file? y/n ") + if ans == "y": + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + data_x = bragg_data + data_e = energy_data + data_y = bpm_data + + dirname = "/home/gac-x10sa/Data/" + # writing output to simple data file for later analysis: + combined = np.column_stack((data_x, data_e, data_y)) + tname = f"Scan_{timestamp}.txt" + filename = dirname + tname + with open(filename, "w") as f: + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + sete(e) # go back + return + + +########################### +### compute a signal +########################### +# see in config file diff --git a/pxii_bec/macros/Undu_Helpers.py b/pxii_bec/macros/Undu_Helpers.py new file mode 100755 index 0000000..8c984ed --- /dev/null +++ b/pxii_bec/macros/Undu_Helpers.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python +from pylab import * +import sys, os + +# import pandas as pd +import matplotlib.pyplot as plt +import numpy as np +import scipy.interpolate +from scipy.optimize import curve_fit +from scipy.ndimage import gaussian_filter1d + + + +def fit_harm(harm, n, order): + x = harm[0, :].astype(float) + y = harm[1, :].astype( + float + ) ## else funny object that might contain funny strings ... + coeff = np.polyfit(x, y, order) # 3 in general i.e., 4 params + polynomial = np.poly1d(coeff) + x_fit = np.linspace(min(x), max(x), 100) + y_fit = polynomial(x_fit) + print("Polynomial coefficients of the harmonic: ", n, coeff) + plot(x_fit, y_fit, color="blue") + + return ( + x / n, + y, + ) # get the normalized energy gap relation for fitting the Halbach coeff + + +######### simple exp fit ############ +def exponential_func0(x, a, b, c): + # fit 3 params a,b,c + + return a * np.exp(b * x + c * x ** 2) + + +######### inverse exp fit ############ +def exponential_func1(x, a, b, c): + # fit 3 params a,b,c + + return 1 / (1 + (a * np.exp(b * x + c * x ** 2))) + + +######### inverse exp fit plus E_max ############ +def exponential_func2(x, a, b, c, d): + # fit 4 params a,b,c, e.g., fit energy of storage ring as well + return d / (1 + (a * np.exp(b * x + c * x ** 2))) + + +################################## + + +def return_harmon(): + # from 3rd to 21th Energy range from 6 keV to 30 keV + # harmonics, July 2025 + import numpy as np + + nharm = 13 + h = [np.array([]) for _ in range(nharm + 1)] + h[0] = np.array([0]) + h[1] = np.array([0]) + h[2] = np.array([0]) + h[3] = np.array([1.07141725, -0.52834258]) # 3rd harmon + h[4] = np.array([0.007111, -0.13678409, 1.52295567, -1.06882785]) + h[5] = np.array([0.00315051, -0.0766359, 1.13107469, -0.86442062]) + + h[6] = np.array([0.00162862, -0.04452336, 0.82948259, -0.37329674]) + h[7] = np.array([0.00080764, -0.02418882, 0.59212947, 0.15702886]) + h[8] = np.array([1.16699242e-03, -4.68207543e-02, 9.43972396e-01, -1.94201019e00]) + h[9] = np.array([4.84194444e-04, -2.01064762e-02, 5.55141139e-01, -3.81800667e-01]) + h[10] = np.array([2.11583333e-04, -8.03503571e-03, 3.43141595e-01, 6.02318000e-01]) + h[11] = np.array([3.34391667e-03, -1.84053357e-01, 3.58338994e00, -1.93640241e01]) + h[12] = np.array([3.20520798e-05, 2.39253145e-03, 8.09198503e-02, 2.22897377e00]) + h[13] = np.array([0.00278744, 0.07979874, 2.05143916]) + + # fitpar_u19 = np.array([ 2.17078531, 0.519452, -0.00720255]) # measured from U19 + + return h + +def plot_harmon(e_start, e_end, h_no, pr_out = False): + + enarr= np.arange(e_start, e_end+1, 0.5) + + h_all = return_harmon() + h=h_all[h_no] + polynomial = np.poly1d(h) + gaps = polynomial(enarr) + + if (pr_out): + print("en =", enarr ) + print("gaps =", gaps ) + + plt.ion() + plt.figure() + + plt.plot(enarr,gaps,'*') + plt.title(f"harmonic no {h_no}") + plt.xlabel("E / keV") + plt.ylabel("Gap / mm") + plt.show() + +def setu19(en, *harm_no, detune=0): + """ + set the U19 to the gaps defined in Jul2025, or the "theoretical" ones for higher + harmonics + USAGE: + setu19(en, *harm_no, detune=0) + en in keV, possibly select a special harmonics, or detune [0/1] to a value + with a nicer beam shape but less flux + """ + g0 = dev.id_gap.readback.get() + + fitpar_u19 = np.array([2.17078531, 0.519452, -0.00720255]) # measured from U19 + + h_all = return_harmon() + # dep on energy -> select harmonics + if harm_no: + harm = int(harm_no[0]) + print("Selected harmonics is:", harm) + + else: + print("Using the default optimised harmonics") + if en <= 7: + harm = 3 # h = h[3] + elif 7 < en <= 10: + harm = 5 + elif 10 < en <= 13: + harm = 7 + elif 13 < en <= 16: + harm = 9 + elif 16 < en <= 19: + harm = 11 + elif 19 < en <= 22: + harm = 13 + + if en <= 22: + h = h_all[harm] + print(f"harmonics is {harm}.") + + polynomial = np.poly1d(h) + g = polynomial(en) + # print(f"For energy {en} the used harmonic is {polynomial} for gap {g}") + ## for time being: if E > 20 : use the "theoretical", old measured and interpolated values up to E = 30 keV + # 21, 22 : 13 + # 23, 24, 25: 15; + # 26: 15 or 17 + # 27, 28: 17 + # 29: 15, 17, 19 + # 30: 19 + ## + if 23 < en <= 25: + n = 15 + enorm = en / n + g = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2]) + elif 25 < en <= 29: + n = 17 + enorm = en / n + g = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2]) + elif 29 < en: + n = 19 + enorm = en / n + g = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2]) + + print("Undulator has been ", g0, " mm") + if 4.5 <= g <= 20: + print("Moving Undulator gap to ", g, " mm") + else: + print("not a valid gap, do nothing") + + if detune: + g =g *0.996 + print("moving to detuned gap value, slightly below max, about 0.15 % ") + #print("move disabled!!") + res = scans.umv(dev.id_gap, g, relative=False) + + return + +################################## +def harmon_walk(estart=7.5, end_en=13): + import time + + en = estart + ans ='y' + while en < end_en+0.5 and ans == 'y': + print(en) + setu19(en, 5) + time.sleep(2) + sete(en) + en = en+0.5 + ans = input("Next energy? y/n: ") + + +################################## +def gap_harm(e=12.4): + fitpar_u19 = np.array([2.17078531, 0.519452, -0.00720255]) + fitpar_u17 = np.array([2.17078572, 0.46477267, -0.005766]) + e = float(input("Please enter an energy between 6 and 30: ")) + + for n in range(3, 23): + + # n = int(input('Please enter a harmonic n, n between 3 and 15 ')) + enorm = e / n + g_19 = exponential_func0(enorm, fitpar_u19[0], fitpar_u19[1], fitpar_u19[2]) + g_17 = exponential_func0(enorm, fitpar_u17[0], fitpar_u17[1], fitpar_u17[2]) + print(f"harmonic {n}, gap for U19 = {g_19:.5f}, for U17 = {g_17:.5f}") + # print(f"harmonic {n}, gap for U17 = {g_17}") + + return + + +################################## + + +def long_gscan(estart=7, end_en=20.5, g_low=4.5, g_high=9.0, nsteps=1500): + import time + import numpy as np + + dirname = "/home/gac-x10sa/Data/" + + print( + f"scanning the U19 gap from {estart} keV to {end_en} keV, for a gapsize from {g_low} to {g_high}" + ) + resol = (g_high - g_low) / nsteps + print(f"nsteps = {nsteps}; resolution is {resol} mm") + dock_area = bec.gui.new("LongGapScan") + wr = dock_area.new().new(bec.gui.available_widgets.Waveform) + mot = dev.id_gap + det = dev.lu_bpmsum + wr.plot(x_name=mot.name, y_name=det.name) ## names first ! + wr.x_label = mot.name + wr.y_label = det.name + g0 = dev.id_gap.readback.get() + + # g_low = 4.5 # 4.5 + # g_high = 9.0 # 9.0 + # nsteps = 1500 # res = 3 um + + en = estart + + while en < end_en: + sete(en) + time.sleep(1) + rock() + print(f"setting energy to {en}") + time.sleep(2) + ds = scans.line_scan( + dev.id_gap, g_low, g_high, steps=nsteps, exp_time=0.8, relative=False + ) + gap_data = ds.scan.live_data.id_gap.id_gap.val + bpm_data = ds.scan.live_data.lu_bpmsum.lu_bpmsum.val + wr.plot(x=gap_data, y=bpm_data) + # writing output to simple data file for later analysis: + combined = np.column_stack((gap_data, bpm_data)) + filename = dirname + "gaps" + str(en) + ".txt" + with open(filename, "w") as f: + np.savetxt(f, combined, delimiter=",", fmt="%5f") + + en += 1 + return + + +################################## +def gscan(centre=0, gomax=0, detune=0): + """ + Scan the ID GAP and go + to the max of lu_bpm intensity + gscan(centre=0): just scan + gscan(centre=1): go to centre of fit max + gscan(centre=1, gomax=1): go to max of intensity + gscan(centre=1,detune=1): position of slightly less flux with nicer beam shape + + """ + import time + + dock_area = bec.gui.new() + wr = dock_area.new().new(bec.gui.available_widgets.Waveform) + mot = dev.id_gap + det = dev.lu_bpmsum + wr.plot(x_name=mot.name, y_name=det.name) ## names first ! + # wr.plot(x=mot.name,y=det.name) ### this comes later + wr.x_label = mot.name + wr.y_label = det.name + + g0 = dev.id_gap.readback.get() + deltag = 0.05 + ds = scans.line_scan( + dev.id_gap, -deltag, deltag, steps=30, exp_time=0.5, relative=True + ) + gap_data = ds.scan.live_data.id_gap.id_gap.val + bpm_data = ds.scan.live_data.lu_bpmsum.lu_bpmsum.val + + #maxy = max(bpm_data) + #indmax = np.argmax(bpm_data) + #gm = gap_data[indmax] + + gcen, xm = fit_plot(gap_data, bpm_data, model="gauss") + + if gomax: + gm = xm + else: + gm = gcen + + print("gap off by ", g0 - gm, " mm") + if detune: + gm=gm*0.996 + print("moving to detuned gap value, slightly (0.15 %) below max") + + if centre: + time.sleep(0.2) + if min(gap_data) <= gm <= max(gap_data): + scans.umv(dev.id_gap, gm, relative=False) + print("moving to ", gm, " mm") + else: + print("Fit too far off, try using option gomax=1") + + return + + diff --git a/pxii_bec/macros/__init__.py b/pxii_bec/macros/__init__.py old mode 100644 new mode 100755 diff --git a/pxii_bec/macros/calculator.py b/pxii_bec/macros/calculator.py new file mode 100755 index 0000000..b10a22a --- /dev/null +++ b/pxii_bec/macros/calculator.py @@ -0,0 +1,355 @@ +"""Utility functions for calculating energy, wavelength, and Bragg angle.""" + +from dataclasses import dataclass +import numpy as np +# from pxii_parameters import (EnergyDefaults, CamConversion) + + +@dataclass(frozen=True) +class Constants: + """Constants used in energy calculations""" + + # # Physical Constants from https://physics.nist.gov/cuu/Constants/index.html + ANGSTROM_CONVERSION = 1e10 # Convert meters to angstrom + PLANCK_CONST_EV = 4.135667696e-15 # eV/Hz + SPEED_OF_LIGHT = 299792458 # m/s + + # d-spacings + d_spacing = {120: 3.13481, 298: 3.13562} + + +def speed_of_light_ang(): + """ + Calculate the speed of light in angstroms per second. + + Returns: + float: The speed of light converted to angstroms per second. + """ + return Constants.SPEED_OF_LIGHT * Constants.ANGSTROM_CONVERSION + + +def en_wav_factor(): + """ + Calculate the energy wavelength factor. + + This function computes a constant factor used to calculate energy + values in relation to wavelength by combining Planck's constant, + in eV/Hz, and the speed of light in angstrom. + + Returns: + float: The computed energy wavelength factor. + """ + return Constants.PLANCK_CONST_EV * speed_of_light_ang() + + +# Helper Functions +def convert_to_degrees(angle_mrad: float) -> float: + """ + Convert an angle from milliradians to degrees. + + Args: + angle_mrad: The angle value in milliradians. + + Returns: + The angle converted into degrees as a float. + """ + return np.rad2deg(angle_mrad / 1000) + + +def create_conversion_result( + energy_ev: float, wavelength: float, bragg_angle_mrad: float +) -> dict: + """ + Creates a dictionary containing converted values of energy and angles. + + This function takes the energy in electron-volts, the wavelength, + and the Bragg angle in milliradians as input. It computes and + returns a dictionary containing the energy in both electron-volts + and kiloelectron-volts, the wavelength, the Bragg angle in milliradians, + and the Bragg angle converted to degrees. + + Args: + energy_ev: Energy value in electron-volts. + wavelength: Wavelength value. + bragg_angle_mrad: Bragg angle in milliradians. + + Returns: + dict: A dictionary containing the following keys: + - "energy_kev": Energy value in kiloelectron-volts. + - "energy_ev": Energy value in electron-volts. + - "wavelength": Wavelength value. + - "bragg_angle_mrad": Bragg angle in milliradians. + - "bragg_angle_deg": Bragg angle in degrees. + """ + return { + "energy_kev": energy_ev / 1000, + "energy_ev": energy_ev, + "wavelength": wavelength, + "bragg_angle_mrad": float(bragg_angle_mrad), + "bragg_angle_deg": float(convert_to_degrees(bragg_angle_mrad)), + } + + +def print_conversion_result(result: dict) -> None: + """ + Prints the energy-related conversion results to the console. + """ + + line = ( + f"energy: {result['energy_ev']:.6g} eV, energy: {result['energy_kev']:.6g} keV, " + f"wavelength: {result['wavelength']:.4g} Å, " + f"bragg angle: {result['bragg_angle_mrad']:.5g} mrad, {result['bragg_angle_deg']:.4g} deg" + ) + print(line) + + +# Conversion Functions +def calculate_wavelength_from_angle(bragg_angle_mrad: float, temp=120) -> float: + """ + calculate_wavelength_from_angle(bragg_angle_mrad: float) -> float + + Arguments: + bragg_angle_mrad: The Bragg angle in milliradians, used to compute the + sine value required for the wavelength calculation. + + Returns: + The calculated wavelength as a float value. + """ + d = Constants.d_spacing[temp] + return 2 * d * np.sin(bragg_angle_mrad / 1000) + + +def calculate_energy_from_wavelength(wavelength: float) -> float: + """ + Calculates the energy of a photon based on its wavelength. + + Args: + wavelength: The wavelength of the photon in angstrom. + + Returns: + The energy of the photon in eV. + """ + return en_wav_factor() / wavelength + + +def calculate_wavelength_from_energy(energy_ev: float) -> float: + """ + Calculates the wavelength of a photon from its energy. + + Arguments: + energy_ev: float + The energy of the photon in electronvolts (eV). + + Returns: + float + The calculated wavelength of the photon in angstrom. + """ + return en_wav_factor() / energy_ev + + +def calculate_bragg_angle_from_wavelength(wavelength: float, temp=120) -> float: + """ + Calculate the Bragg angle in milliradians for a given wavelength. + + Args: + wavelength: The wavelength in angstrom. + + Returns: + The Bragg angle in milliradians as a float. + """ + d = Constants.d_spacing[temp] + angle_rad = np.arcsin(wavelength / (2 * d)) + return angle_rad * 1000 + + +def convert_input_angle_to_mrad(bragg_angle: float) -> float: + """ + Convert input angle into milliradians (mrad). + + This function takes an angle as input and determines its likely unit, + converting it to milliradians (mrad) if necessary. If the input value + is less than 1, it is assumed to be in radians and is converted to + mrad. If the input value falls between predefined minimum and + maximum values for mrad, it is assumed to be in degrees and thus + converted to mrad using the degrees-to-radians conversion factor. + + For input values that don't match these scenarios, it assumes + that the input is already in mrad and returns it unchanged. + + Arguments: + bragg_angle (float): The input Bragg angle, which can be in + radians, degrees, or milliradians. + + Returns: + float: The Bragg angle converted into milliradians (mrad). + """ + if bragg_angle < 1: # Likely the input angle is in radians + return bragg_angle * 1000 + if 3 < bragg_angle < 25: # Likely input angle is in degrees + return np.deg2rad(bragg_angle) * 1000 + return bragg_angle # Already in mrad + + +# Core Functions +def validate_energy(energy_ev): + """ + Validates the energy value to ensure it falls within the acceptable range. The function + converts the provided energy from keV to eV if the input value is less than 1/1000 of the + maximum energy value. It then checks whether the energy is within the defined bounds. + If the energy value is outside the acceptable range, the function raises a ValueError. + + Args: + energy_ev (float): The energy value in eV or keV to be validated. If this value is + smaller than 1/1000 of the maximum allowed energy (in eV), it will be multiplied + by 1000 to convert it from keV to eV. + + Returns: + float: The validated energy value in eV that falls within the acceptable range. + + Raises: + ValueError: If the energy value is outside the defined range of + [MIN_ENERGY_EV, MAX_ENERGY_EV]. + """ + if energy_ev < EnergyDefaults.max_energy_ev / 1000: # Assuming the input is in keV. + energy_ev *= 1000 + if not EnergyDefaults.min_energy_ev <= energy_ev <= EnergyDefaults.max_energy_ev: + raise ValueError( + f"Energy of {energy_ev} eV is outside the valid range " + f"({EnergyDefaults.min_energy_ev} eV to {EnergyDefaults.max_energy_ev} eV)" + ) + return energy_ev + + +def convert_from_bragg( + bragg_angle_mrad: float, temp=120, print_result: bool = False +) -> dict: + """ + Convert the Bragg angle to wavelength and energy, returning the result as a dictionary. + + This function converts a given Bragg angle (in milliradians) into the corresponding + wavelength and energy values, and returns them in a dictionary format. The function + also supports optional printing of the calculated results. + + Args: + bragg_angle_mrad (float): The Bragg angle in milliradians to be converted. + print_result (bool): Whether to print the conversion result. Defaults to False. + + Returns: + dict: A dictionary containing the following keys: + - 'energy_ev': Energy in electronvolts. + - 'wavelength': Wavelength corresponding to the input angle. + - 'bragg_angle_mrad': Input Bragg angle in milliradians. + """ + bragg_angle_mrad = convert_input_angle_to_mrad(bragg_angle_mrad) + wavelength = float(calculate_wavelength_from_angle(bragg_angle_mrad, temp=temp)) + energy_ev = float(calculate_energy_from_wavelength(wavelength)) + result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad) + if print_result: + print_conversion_result(result) + return result + + +def convert_from_energy(energy_ev: float, temp=120, print_result: bool = False) -> dict: + """ + Convert energy in electron volts (eV) to wavelength and Bragg angle in milliradians + (mrad). This method validates the given energy, calculates corresponding properties, + and optionally prints the result. + + Args: + energy_ev: Energy value in electron volts (float) to be converted. + print_result: Flag indicating whether to print the resulting + conversion details (bool). Defaults to False. + + Returns: + A dictionary containing the following key-value pairs: + - "energy_ev" (float): Validated energy in eV. + - "wavelength" (float): Calculated wavelength in meters. + - "bragg_angle_mrad" (float): Calculated Bragg angle in mrad. + """ + energy_ev = validate_energy(energy_ev) + wavelength = calculate_wavelength_from_energy(energy_ev) + bragg_angle_mrad = float( + calculate_bragg_angle_from_wavelength(wavelength, temp=temp) + ) + result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad) + if print_result: + print_conversion_result(result) + return result + + +def convert_from_wavelength( + wavelength: float, + temp: float = 120, + print_result: bool = False, +) -> dict: + """ + Convert a given wavelength value into corresponding energy, Bragg angle, and + generate a result dictionary. + + The function processes a wavelength value, checks its validity against a + permitted range, calculates corresponding energy and Bragg angle, and + formats the results into a dictionary. Optionally, the function can print + the result. + + Parameters: + wavelength: float + The input wavelength value in Angstroms to be converted. Should + fall within the permitted wavelength range. + print_result: bool + Optional flag indicating whether to print the conversion result. + Default is False. + + Returns: + dict + A dictionary containing the energy (electron-volts), wavelength + (Angstroms), and Bragg angle (milliradians). If the wavelength is + outside of the permitted range, returns None. + """ + energy_ev = calculate_energy_from_wavelength(wavelength) + bragg_angle_mrad = float( + calculate_bragg_angle_from_wavelength(wavelength, temp=temp) + ) + result = create_conversion_result(energy_ev, wavelength, bragg_angle_mrad) + if print_result: + print_conversion_result(result) + return result + + +def calc_perp_position( + energy_ev: float, + print_result: bool = False, +) -> float: + """ + Calculate the perpendicular motor position based on provided energy in electron-volts (eV). + + This function computes the perpendicular motor position using the given energy value in + electron-volts. The calculation is based on the Bragg angle derived from the energy. An optional + parameter allows printing the result during execution. + + Parameters: + energy_ev (float): The energy value in electron-volts used for the calculation. + print_result (bool): Flag to determine whether to print the computed perpendicular offset. + Default is False. + + Returns: + float: The computed perpendicular position. + + Raises: + None + """ + result = convert_from_energy(energy_ev, print_result=False) + bragg_angle_rad = result["bragg_angle_mrad"] / 1000 + perp_offset = float(EnergyDefaults.beam_offset / (2 * np.cos(bragg_angle_rad))) - 3 + if print_result: + print(f"Perp = {perp_offset: .4f}") + return perp_offset + + +def calc_scam_microns(pixels, zoom=1000): + """Convert pixels to microns for the sample camera""" + return float(pixels / (CamConversion.a * np.exp(CamConversion.b * zoom))) + +def calc_bsccam_microns(pixels): + """Convert pixels to microns for the BSC camera""" + return pixels*20 + diff --git a/pxii_bec/macros/flux_di.py b/pxii_bec/macros/flux_di.py new file mode 100755 index 0000000..a22a174 --- /dev/null +++ b/pxii_bec/macros/flux_di.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# ### PYTHON pro to determine the flux from the current +# ### on a Si diode +# ### A Pauluhn, Dec 2012 +# ### from IDL pro flux_diode.pro (2010) + + +## NOTE: for TRANSFER DIODE: +## switch off ANY ambient light +## note that: 10 um Si, and 0 um Al for transfer diode Hamamatsu + +# ### import the standard +import os, sys, string, time +import re +import numpy as np +import math + + + + +def fluxcalc(curr, en, t_air, t_si, t_al): + + #------ parameters: + rsi = 2.33 # g/cm^3 density of Si + ral = 2.699 # g/cm^3 density of Al + rair = 1.205e-3 # g/cm^3 density of air + + eps_si = 3.62 # eV , energy req for charge separation in Si (el/hole pair) + e = 1.602e-19 # As , elem charge + + t_si = 0.0012 # thickness of diode in cm (==12 micrometres) ; [*10000] + t_al = 0.002 # thickness of diode in cm (==20 micrometres) + + #--------- calc---------------------------------------------------- + #energy deposit in Silicon + #ratio of photoelectric cross section to density for Silicon + #alog10(A_Si)= 4.158 - 2.238*alog10(Ep) - 0.477*alog10(Ep)^2 + 0.0789*alog10(Ep)^3 + + ps = np.poly1d([0.0789, - 0.477, - 2.238, 4.158]) + #print ps + # ps.c + # print ps(2.3) + + leval = math.log10(en) + hlp_si = ps(leval) + a_si = np.pow(10, hlp_si) + #efact = np.exp(-A_Si*rsi*t_Si) + #sifact =1 - efact + sifact = - np.expm1(-a_si*rsi*t_si) + #print 'sifact = ', sifact + + fl = 1000.*curr * eps_si/1.602/en/sifact *1.e13 # curr expected in A + #print 'fl = ', fl + + #Aluminium attenuation + #ratio of photoelectric cross section to density for Aluminium + pa = np.poly1d([ 0.0638, - 0.413, - 2.349, 4.106]) + + hlp_al = pa(leval) + a_al = pow(10, hlp_al) + + # attenuation due to aluminium + alfact = np.exp(-a_al*ral*t_al) + #print 'alfact = ', alfact + # Air attenuation + # ratio of photoelectric cross section to density for air + + pair = np.poly1d([0.928, - 2.348, - 1.026, 3.153]) + hlp_air = pair(leval) + a_air = np.pow(10, hlp_air) + + #print('hlp_air, a_air, rair, t_air = ', hlp_air, a_air, rair , t_air) + + # attenuation due to air + airfact = np.exp(-a_air*rair*t_air/10.) + #airfact = 1. + + + #print 'airfact = ', airfact + #print ' t_air = ', t_air + + # total flux from photocurrent + fl = fl/alfact/airfact + #print 'fl = ', fl + + return fl + +# +# Main +# + +def flux_x10sa(): + curr = input('Please enter the current [in A] ') + curr = float(curr) + print('Current = ', curr) + en = input('Please enter the energy [in keV] ') + en = float(en) + print('Energy = ', en) + ## d_det = input('Please enter the detector distance [in mm] ') + d_det = dev.det_z.user_readback.get() + d_off = 15 ## at X10SA ### input('Please enter the OFFSET of the diode to det surface [in mm] ') + + t_air = float(d_det) + float(d_off) + print('Air path length (CARE ! add det distance and offset of diode) = ', t_air) # include additional diode distance + + # t_si = input('Please enter the Si thickness of diode [in um] ') + # t_si = float(t_si) / 10000. # to cm + + t_si = 12 # um ## at X10SA + # t_al = input('Please enter the Al thickness of diode [in um] ') + # t_al = float(t_al) / 10000. # to cm + + t_al = 20 # um ## at X10SA + fl = fluxcalc(curr, en, t_air,t_si, t_al) + + print( " Diode Current = %.6f A " % curr) + print( " Energy = %.4f keV " % en ) + + print( " Thickness of active Si layer = %.4f\t cm " % t_si) #*10000. + print( " Thickness of Al layer in front of diode = %.4f\t cm " % t_al) #*10000. + print( " Length of path of air in front of diode = %.4f\t mm " % t_air) #*10000. + + print (" Flux = %6.2e\t ph/s " % fl) + + return fl + diff --git a/pxii_bec/macros/mx_basics.py b/pxii_bec/macros/mx_basics.py new file mode 100755 index 0000000..aa8e522 --- /dev/null +++ b/pxii_bec/macros/mx_basics.py @@ -0,0 +1,254 @@ +"""Get data from an h5 file or BEC history and perform fitting.""" + +import numpy as np +from lmfit.models import ( + GaussianModel, + LorentzianModel, + VoigtModel, + ConstantModel, + LinearModel, +) +from scipy.ndimage import gaussian_filter1d +import h5py +import matplotlib.pyplot as plt + + +def create_fit_parameters( + deriv: bool = False, + model: str = "Voigt", + baseline: str = "Linear", + smoothing: None = None, +): + """Store the fit parameters in a dictionary.""" + # map input model to lmfit model name + model_mappings = { + "Gaussian": GaussianModel, + "Lorentzian": LorentzianModel, + "Voigt": VoigtModel, + "Constant": ConstantModel, + "Linear": LinearModel, + } + return { + "deriv": deriv, + "model": model_mappings[model], + "baseline": model_mappings[baseline], + "smoothing": smoothing, + } + + +def get_data_from_h5(signal_name: str = "lu_bpmsum"): + """Get data from an h5 file.""" + with h5py.File("scan_676.h5", "r") as f: + entry = f["entry"]["collection"] + y_data = entry["devices"][signal_name][signal_name]["value"][:] + motor_data = entry["metadata"]["bec"] + motor_name = motor_data["scan_motors"][0].decode() + scan_number = motor_data["scan_number"][()] + x_data = entry["devices"][motor_name][motor_name]["value"][:] + return { + "x_data": x_data, + "y_data": y_data, + "signal_name": signal_name, + "motor_name": motor_name, + "scan_number": str(scan_number), + } + + +def get_data_from_history( + history_index: int, + signal_name: str = "lu_bpmsum", +): + """Read data from the BEC history and return the X and Y data as arrays.""" + scan = bec.history[history_index] + md = scan.metadata["bec"] + motor_name = md["scan_motors"][0].decode() + scan_number = md["scan_number"] + x_data = scan.devices[motor_name][motor_name].read()["value"] + y_data = scan.devices[signal_name][signal_name].read()["value"] + return { + "signal_name": signal_name, + "x_data": x_data, + "y_data": y_data, + "motor_name": motor_name, + "scan_number": scan_number, + } + + +def process_data(data, fit_params): + """ + Process the signal data for fitting based on derivative or smoothing. + """ + smoothing, deriv = fit_params["smoothing"], fit_params["deriv"] + signal_name = data["signal_name"] + y_data = data["y_data"] + + if deriv: + if smoothing: + y_smooth = gaussian_filter1d(y_data, smoothing) + fitting_data = np.gradient(y_smooth) + signal_name = f"Derivative of smoothed {signal_name}" + else: + fitting_data = np.gradient(y_data) + signal_name = f"Derivative of {signal_name}" + elif smoothing and smoothing > 0.01: + fitting_data = gaussian_filter1d(y_data, smoothing) + signal_name = f"Smoothed {signal_name}" + else: + fitting_data = y_data + + updated_data = { + "y_to_fit": fitting_data, + "signal_name": signal_name, + } + data.update(updated_data) + return data + + +def fit(data, fit_params): + """Fit a signal to a model and return the fitting results.""" + # Create the model + peak_model = fit_params["model"](prefix="peak_") + baseline_model = fit_params["baseline"](prefix="base_") + full_model = peak_model + baseline_model + + # Prepare data + processed_data = process_data(data, fit_params) + params = full_model.make_params() + y_min = np.min(processed_data["y_to_fit"]) + + # Configure baseline parameters + if fit_params["baseline"] == ConstantModel: + params["base_c"].set(value=y_min) + elif fit_params["baseline"] == LinearModel: + params["base_intercept"].set(value=y_min) + params["base_slope"].set(value=0) + + # Add peak-specific parameters + params.update( + peak_model.guess(processed_data["y_to_fit"], x=processed_data["x_data"]) + ) + + # Perform the fitting + lmfit_result = full_model.fit( + processed_data["y_to_fit"], params, x=processed_data["x_data"] + ) + + # Find the X that gives the max Y + max_index = np.argmax(processed_data["y_to_fit"]) + x_max = processed_data["x_data"][max_index] + + # Collect results + return { + "model": fit_params["model"].__name__, + "fwhm": lmfit_result.params["peak_fwhm"].value, + "centre": lmfit_result.best_values["peak_center"], + "height": lmfit_result.params["peak_height"].value, + "chi_sq": lmfit_result.chisqr, + "lmfit_result": lmfit_result, + "x_max": x_max, + } + + +def plot_fitted_data(data, fit_result): + """Plot the original data and the fitted model.""" + plt.plot(data["x_data"], data["y_to_fit"], label="Data") + plt.plot( + data["x_data"], + fit_result["lmfit_result"].best_fit, + "-", + label=f"FWHM = {fit_result['fwhm']:.3f}," + f"Centre = {fit_result['centre']:.3f}, " + f"Height = {fit_result['height']:.3f}", + ) + plt.xlabel(data["motor_name"]) + plt.ylabel(data["signal_name"]) + plt.title(f"Scan {data['scan_number']}, fitted with {fit_result['model']}") + plt.grid(True) + plt.legend() + plt.show() + + +def select_bec_window(dock_area_name="Fitting"): + """Check to see if the fitting results dock is already open and re-create it if not""" + open_docks = bec.gui.windows + if open_docks.get(dock_area_name) is None: + dock_area = bec.gui.new(dock_area_name) + wf = dock_area.new("Plot").new(bec.gui.available_widgets.Waveform) + text_box = dock_area.new("Results", position="bottom").new( + widget=bec.gui.available_widgets.TextBox + ) + else: + wf = bec.gui.Fitting.Plot.Waveform + text_box = bec.gui.Fitting.Results.TextBox + return wf, text_box + + +def plot_live_data_bec( + motor_name, + signal_name, + window_name="Fitting" +): + """ + Plotting live data for motor and signal using BEC. + + This function plots live data from a specified motor and signal. + It clears the current plot window, sets its title, labels the axes + with the provided motor and signal names, and initializes live plotting + on the given signal against the motor. + + Args: + motor_name (str): The name of the motor to be used as the x-axis. + signal_name (str): The name of the signal to be used as the y-axis. + + Returns: + None + """ + wf, text_box = select_bec_window(window_name) + text_box.set_plain_text("Plotting live data") + wf.clear_all() + wf.title = "Scan: Live scan" + wf.x_label = motor_name + wf.y_label = signal_name + wf.plot(x_name=motor_name, y_name=signal_name) + + +def plot_fitted_data_bec( + data, + fit_result, +): + """ + Plot fitted data and display fitting parameters in the specified window. + + This function selects a BEC window and plots the original data along with the + fitted function. Additionally, it displays the fitting results in a text + box within the same window for better visualization of the fit results. + + Parameters: + data : dict + Dictionary containing the original dataset, where 'x_data' and 'y_to_fit' + hold the independent variable and the dependent variable, respectively, + 'scan_number' represents the scan number, 'motor_name' and 'signal_name' + provide axis labels. + fit_result : dict + Dictionary containing the results of the fit, including parameters such + as 'centre', 'fwhm', 'height', and the fitted model stored under + 'lmfit_result', with its 'best_fit' attribute representing the fitted data. + """ + wf, text_box = select_bec_window() + fit_text = ( + f"Fit parameters: Centre = {fit_result['centre']:.4f}, " + f"FWHM = {fit_result['fwhm']:.3f}, " + f"Height = {fit_result['height']:.4f}\n" + f"Model = {fit_result['model']}\n" + f"Chi sq = {fit_result['chi_sq']:.3g}" + ) + text_box.set_plain_text(fit_text) + wf.clear_all() + wf.title = f"Scan: {data['scan_number']}" + wf.x_label = data["motor_name"] + wf.y_label = data["signal_name"] + wf.plot(x=data["x_data"], y=data["y_to_fit"], label="data") + wf.plot( + x=data["x_data"], y=fit_result["lmfit_result"].best_fit, label="Fit to data" + ) + diff --git a/pxii_bec/macros/mx_methods.py b/pxii_bec/macros/mx_methods.py new file mode 100755 index 0000000..181ae3a --- /dev/null +++ b/pxii_bec/macros/mx_methods.py @@ -0,0 +1,344 @@ +"""Use the methods in mx_basics to perform: +1) a go_to_peak scan, that scans a motor, finds the peak position and moves to peak +2) fits data from a bec history file +""" + +import numpy as np +# from pxiii_parameters import FitDefaults, BPMScans, MirrorConfig + +# from mx_basics import ( +# create_fit_parameters, +# get_data_from_history, +# fit, +# plot_fitted_data_bec, +# plot_live_data_bec, +# ) + + +# Method functions +def calculate_step_size(start: float, stop: float, steps: int) -> float: + """ + Provides the function to calculate the step size for dividing a specified range + into a given number of steps. + + Args: + start: The starting value of the range. + stop: The stopping value of the range. + steps: The number of steps to divide the range into. Must be at least 1. + + Raises: + ValueError: If the steps value is less than 1. + + Returns: + The calculated step size as a float, rounded to three decimal places. + """ + if steps < 1: + raise ValueError("Number of steps must be at least 1.") + return round((stop - start) / steps, 3) + + +def move_to_position(motor_device, motor_name: str, position: float, data: dict): + """ + Function to move a specified motor device to a given position. + + The function verifies if the requested position is within the scan range of the + motor device provided. If the position is outside the range, the motor is + moved to the center of its scan range, an error message is raised, and the + operation is halted. If the position is valid, the motor is moved to the + specified position. + + Parameters: + motor_device: The motor device to be moved. + motor_name: str + The name of the motor as a string + position: float + The desired position to move the motor to. Position should be within + the scan range of the motor determined by the provided data. + data: dict + A dictionary containing "x_data", which is used to determine the + scan range of the motor. + + Raises: + ValueError: Raised if the specified position is outside the valid scan + range determined by "x_data" in the data dictionary. The motor will + return to the center of its scan range in this case. + """ + + motor_min = np.min(data["x_data"]) + motor_max = np.max(data["x_data"]) + motor_centre = (motor_max + motor_min) / 2 + + if not motor_min <= position <= motor_max: + umv(motor_device, motor_centre) + msg = ( + f"Position {position: .2f} is outside the scan range of " + f"{motor_min: .2f}to {motor_max: .2f}. " + f"Returning to centre of scan range {motor_centre: .3f}." + ) + raise ValueError(msg) + motor_position = round(position, 4) + umv(motor_device, motor_position) + print(f"\n Moving {motor_name} to position {motor_position: .3f}") + + +def go_to_peak( + motor_device, + signal_device, + start: float, + stop: float, + steps: int, + relative: bool = FitDefaults.RELATIVE_MODE, + plot: bool = True, + settle: float = FitDefaults.SETTLE_TIME, + confirm: bool = True, + gomax: bool = False, +): + """ + Go to the peak of a signal by scanning a motor within a specified range and + identifying the optimal position based on signal peak data. + + Parameters: + motor_device: The motor device to be scanned. + signal_device: The signal device to monitor during the scan. + start (float): The starting position of the scan. Ignored if `relative` is True. + stop (float): The ending position of the scan. Ignored if `relative` is True. + steps (int): The number of steps to divide the scan range into. + relative (bool, optional): If True, interpret `start` and `stop` as relative to + the current motor position. Defaults to RELATIVE_MODE constant. + plot (bool, optional): If True, plot the scan data and the fitted results. + Defaults to True. + settle (float, optional): The time in seconds to wait after each step for the + signal to stabilize. Defaults to DEFAULT_SETTLE_TIME constant. + confirm (bool, optional): If True, ask for user confirmation before starting + the scan. Defaults to True. + + Raises: + Exception: Raises exceptions potentially raised by dependent functions or + operations such as plotting, fitting, or motor movement. + + Returns: + None + """ + motor_name = motor_device.name + signal_name = signal_device.name + # wf.plot(x_name=motor_name, y_name=signal_name) + if plot: + plot_live_data_bec(motor_name, signal_name) + + # Validate and calculate step size + step_size = calculate_step_size(start, stop, steps) + + # Confirm the scan range + # current_motor_position = motor_device.user_readback.get() + current_motor_position = motor_device.read()[motor_name]["value"] + if confirm: + if relative: + scan_start = current_motor_position + start + scan_end = current_motor_position + stop + print( + f"\nScanning from {scan_start: .6g} to {scan_end: .6g} in " + f"{steps} steps of size {step_size}" + ) + print(f"Relative mode = {relative}") + else: + print( + f"\nScanning from {start: .5g} to {stop: .5g} in {steps} steps of size {step_size}" + ) + print(f"Relative mode = {relative}") + input("Press Enter to continue...") + # Perform the scan + scan_result = scans.line_scan( + motor_device, start, stop, steps=steps, relative=relative, settling_time=settle + ) + motor_data = scan_result.scan.live_data[motor_name][motor_name].val + signal_data = scan_result.scan.live_data[signal_name][signal_name].val + scan_number = "Current" + + data = { + "x_data": np.array(motor_data), + "y_data": np.array(signal_data), + "motor_name": motor_name, + "signal_name": signal_name, + "motor_device": motor_device, + "scan_number": scan_number, + } + + # Define and fit model to scan data + fit_params = create_fit_parameters(False, FitDefaults.MODEL, FitDefaults.BASELINE) + fit_result = fit(data, fit_params) + + # Plot the fitted data if plot = True + if plot: + plot_fitted_data_bec(data, fit_result) + + # If gomax is set then move to the maximum value, rather than the fit centre + if gomax: + value = fit_result["x_max"] + print(f"Max position is at {value}") + move_to_position( + data["motor_device"], data["motor_name"], fit_result["x_max"], data + ) + else: + # Safely move the motor to the peak position + move_to_position( + data["motor_device"], data["motor_name"], fit_result["centre"], data + ) + + +def fit_history( + history_index: int, + signal_name: str, + deriv: bool = False, + model: str = FitDefaults.MODEL, + move_to_peak: bool = False, +): + """ + Retrieve and analyze historical data by fitting a model, optionally moving to + a peak position. + + Parameters: + history_index (int): Index of the historical data set to retrieve. + signal_name (str): Name of the signal to fit. + deriv (bool, optional): Whether to include the derivative in the fitting + procedure. Defaults to False. + model (str, optional): Name of the model to use for fitting. Defaults to + DEFAULT_MODEL. + move_to_peak (bool, optional): Whether to move the motor to the peak position + after fitting. Defaults to False. + + Raises: + KeyError: If required keys are not found in the retrieved data dictionary. + ValueError: If the fitting process fails or produces invalid results. + + Returns: + None + """ + # Retrieve historical data + data = get_data_from_history(history_index, signal_name) + + # Define fitting parameters + fit_params = create_fit_parameters(deriv, model, FitDefaults.BASELINE) + + # Perform fit and plot the data + fit_result = fit(data, fit_params) + plot_fitted_data_bec(data, fit_result) + + # Optionally move the motor to the peak position + if move_to_peak: + move_to_position( + data["motor_device"], data["motor_name"], fit_result["centre"], data + ) + + +def scan_bpm(bpmname): + """ + Runs a grid scan of a BPM in x and y, and plots each channel + as a heatmap. + + Parameters: + bpmname: the name of the bpm to be scanned e.g. "fe" + + """ + + # Open a dock area and set up the heatmaps + dock_area = bec.gui.new("XBPM_Scan") + wf5 = dock_area.new("Sum").new(bec.gui.available_widgets.Heatmap) + wf1 = dock_area.new("Ch1", relative_to="Sum", position="bottom").new( + bec.gui.available_widgets.Heatmap + ) + wf3 = dock_area.new("Ch3", relative_to="Ch1", position="right").new( + bec.gui.available_widgets.Heatmap + ) + wf4 = dock_area.new("Ch4", relative_to="Ch3", position="bottom").new( + bec.gui.available_widgets.Heatmap + ) + wf2 = dock_area.new("Ch2", relative_to="Ch1", position="bottom").new( + bec.gui.available_widgets.Heatmap + ) + wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl) + + cfg = getattr(BPMScans, bpmname) + + wf1.x_label = cfg["x_name"] + wf1.y_label = cfg["y_name"] + wf1.plot( + x_name=cfg["x_name"], + y_name=cfg["y_name"], + z_name=cfg["z1_name"], + color_map="plasma", + ) + + wf2.x_label = cfg["x_name"] + wf2.y_label = cfg["y_name"] + wf2.plot( + x_name=cfg["x_name"], + y_name=cfg["y_name"], + z_name=cfg["z2_name"], + color_map="plasma", + ) + + wf3.x_label = cfg["x_name"] + wf3.y_label = cfg["y_name"] + wf3.plot( + x_name=cfg["x_name"], + y_name=cfg["y_name"], + z_name=cfg["z3_name"], + color_map="plasma", + ) + + wf4.x_label = cfg["x_name"] + wf4.y_label = cfg["y_name"] + wf4.plot( + x_name=cfg["x_name"], + y_name=cfg["y_name"], + z_name=cfg["z4_name"], + color_map="plasma", + ) + + wf5.x_label = cfg["x_name"] + wf5.y_label = cfg["y_name"] + wf5.plot( + x_name=cfg["x_name"], + y_name=cfg["y_name"], + z_name=cfg["z5_name"], + color_map="plasma", + ) + # Run the scan + x_mot = cfg["x_device"] + y_mot = cfg["y_device"] + # scans.grid_scan(x_mot, -0.5, 0.5, 20, y_mot, -0.5, 0.5, 20, + # exp_time=0.5, relative=False, snaked=True) + + +def optimise_kb(mirror): + """ + Runs a grid scan of a the upstream and downstream benders, + and plots a heatmap of the sample camera x or y sigma. + + Parameters: + mirror: either "hfm" or :vfm" + + """ + + # Open a dock area and set up the heatmaps + dock_area = bec.gui.new(mirror) + wf1 = dock_area.new("Heatmap").new(bec.gui.available_widgets.Heatmap) + + wfscan = dock_area.new("ScanControl").new(bec.gui.available_widgets.ScanControl) + + cfg = getattr(MirrorConfig, mirror) + + wf1.x_label = cfg["bu_name"] + wf1.y_label = cfg["bd_name"] + wf1.plot( + x_name=cfg["bu_name"], + y_name=cfg["bd_name"], + z_name=cfg["z_name"], + color_map="plasma", + ) + + # Run the scan + x_mot = cfg["x_device"] + y_mot = cfg["y_device"] + # scans.grid_scan(x_mot, -0.02, 0.02, 11, y_mot, -0.02, 0.02, 11, + # exp_time=0.5, relative=True, snaked=True) diff --git a/pxii_bec/macros/pxii_energy.py b/pxii_bec/macros/pxii_energy.py new file mode 100755 index 0000000..f1cc97f --- /dev/null +++ b/pxii_bec/macros/pxii_energy.py @@ -0,0 +1,244 @@ +"""Script to change energy at PXII by setting gap, DCM motors and mirror stripe + +Moving DCM motors - implemented for Bragg, pitch and perp +Gap - optional, can be switched off using move_gap=False +Mirrors - change of mirror stripe is not yet implemented +Plotting optional + +""" + +import numpy as np +# from pxii_gap import set_gap +# from mx_methods import go_to_peak +# from pxii_parameters import EnergyDefaults, Calibration + +# from calculator import ( +# calc_perp_position, +# validate_energy, +# convert_from_bragg, +# convert_from_energy, +# ) + + +def get_current_energy(): + """ + Returns the energy in eV from the current bragg angle. + """ + current_bragg_angle = dev.dcm_bragg.user_readback.get() + current_energy = convert_from_bragg(current_bragg_angle, print_result=False)[ + "energy_ev" + ] + return current_energy + + +# Functions below are common to all beamlines +def calculate_energy_difference(current_energy, target_energy): + """ + Calculate the absolute difference in energy between the current energy level + and the target energy level. + """ + return abs(target_energy - current_energy) + + +def get_mirror_stripe(energy_ev): + """ + Determines the mirror stripe material based on the energy level provided + and the specified thresholds for silicon, rhodium, and platinum. + + Args: + energy_ev (float): Energy level in electron volts, used to determine + the corresponding material type. + + Returns: + str: A string indicating the material type corresponding to the provided + energy level. Possible values are "silicon", "rhodium", or "platinum". + """ + if energy_ev <= EnergyDefaults.stripe_thresholds["silicon"]: + return "silicon" + if ( + EnergyDefaults.stripe_thresholds["silicon"] + < energy_ev + <= EnergyDefaults.stripe_thresholds["rhodium"] + ): + return "rhodium" + return "platinum" + + +def set_mirror_stripe(energy_ev): + """ + Selects and sets the appropriate mirror stripe based on the given energy value + in electron volts (eV). + + Args: + energy_ev (float): The energy value in electron volts + + Returns: + None + """ + selected_stripe = get_mirror_stripe(energy_ev) + print(f"Selected mirror stripe: {selected_stripe}") + + +def mono_pitch_scan(plot=True): + """Scan the monochromator pitch and move to the peak.""" + + if plot: + print("Scanning monochromator pitch and moving to peak, with plotting.") + go_to_peak( + EnergyDefaults.mono_pitch, + EnergyDefaults.signals["sig1"], + -EnergyDefaults.pitch_scan["halfwidth"], + EnergyDefaults.pitch_scan["halfwidth"], + steps=EnergyDefaults.pitch_scan["steps"], + relative=True, + settle=0.01, + plot=True, + confirm=False, + ) + else: + print("Scanning monochromator pitch and moving to peak, without plotting.") + go_to_peak( + EnergyDefaults.mono_pitch, + EnergyDefaults.signals["sig1"], + -EnergyDefaults.pitch_scan["halfwidth"], + EnergyDefaults.pitch_scan["halfwidth"], + steps=EnergyDefaults.pitch_scan["steps"], + relative=True, + settle=0.01, + plot=False, + confirm=False, + ) + + +# Specific functions - need to be edited for each beamline + + +def move_gap_if_needed(energy_ev, move_gap=False): + """ + Move the gap position based on energy if the move_gap flag is set. + + Args: + energy_ev (float): The energy value in electron volts (eV) used for gap movement. + move_gap (bool): A flag indicating whether the gap position should be moved or not. + + Returns: + None + """ + if move_gap: + set_gap(energy_ev) + else: + print("Not moving gap.") + + +# def get_roll(energy_ev): +# """Calculates the roll based on calibration data.""" +# calib = Calibration() +# roll = np.poly1d(calib.roll)(energy_ev) +# if roll < 0: +# return 0 +# if roll >= 9.95: +# return 9.95 +# return float(roll) + + +def get_dcm_motors_positions(energy_ev): + """ + Pitch and roll are calculated based on fits of the measured calibration values. + Perp and Bragg are calculated based on the energy value via calculator.py. + + Arguments: + energy_ev (float): The energy value in electron volts for which the + DCM motor positions are to be calculated. + + Returns: + dict: A dictionary containing the calculated DCM motor positions + including values retrieved from the lookup table, Bragg angle + in milliradians, and perpendicular position. + """ + # dcm_motor_values = get_value_from_lut(energy_ev) + dcm_motor_values = {} + # pitch = float(np.poly1d(Calibration.pitch)(energy_ev)) + pitch = -5.304 + roll = EnergyDefaults.mono_roll_value + perp = calc_perp_position(energy_ev, print_result=False) + bragg_angle = convert_from_energy(energy_ev, print_result=False)["bragg_angle_mrad"] + dcm_motor_values.update( + { + "bragg_angle": bragg_angle, + "perp": perp, + "dcm_pitch": pitch, + "dcm_roll": roll, + } + ) + return dcm_motor_values + + +def move_dcm_motors(energy_ev): + """ + Move the DCM bragg, pitch and perp motors to the required positions + for the given energy in eV. + + """ + dcm_pos = get_dcm_motors_positions(energy_ev) + print( + f"Moving DCM bragg: {dcm_pos['bragg_angle']: .5g} mrad, " + f"DCM perp: {dcm_pos['perp']: .3g} mm, " + f"DCM pitch fixed at -5.304 mrad, " + # f"DCM pitch: {dcm_pos['dcm_pitch']: .5g} mrad, " + f"DCM Roll: {dcm_pos['dcm_roll']:.5g} V" + ) + # umv(EnergyDefaults.energy, dcm_pos["bragg_angle"]) + # umv(EnergyDefaults.mono_perp, dcm_pos["perp"]) + # umv(EnergyDefaults.mono_pitch, dcm_pos["dcm_pitch"]) + # umv(EnergyDefaults.mono_roll, dcm_pos["dcm_roll"]) + # print("\n***DCM Roll movement is currently disabled ***\n") + umv(EnergyDefaults.energy, dcm_pos["bragg_angle"], + EnergyDefaults.mono_perp, dcm_pos["perp"], + EnergyDefaults.mono_pitch, dcm_pos["dcm_pitch"], + EnergyDefaults.mono_roll, dcm_pos["dcm_roll"]) + + +def bl_energy(energy_ev, move_gap=False, mono_scan=True, plot=True): + """ + Adjusts the beamline's energy to the specified energy in electron volts (eV). + The function validates the target energy, checks the current energy, and makes + adjustments only if the energy difference is significant enough. It performs + necessary operations including moving the gap, changing DCM motors, updating + the mirror stripe, and scanning to find the optimal DCM pitch. + + Args: + energy_ev: Target energy in electron volts to which the beamline should be adjusted. + move_gap: Boolean flag indicating whether to adjust the gap before setting energy. + plot: Boolean flag indicating whether to plot the DCM pitch scan for finding the peak. + + Returns: + None + """ + energy_ev = validate_energy(energy_ev) # Ensure energy is valid. + + # Check current energy to avoid unnecessary adjustments. + + current_energy = get_current_energy() + energy_diff = calculate_energy_difference(current_energy, energy_ev) + + if energy_diff <= EnergyDefaults.min_energy_change: + print( + f"Energy change of {energy_diff:.2f} eV is too small, not changing energy." + ) + return + + # Step 1: Move the gap if needed. + move_gap_if_needed(energy_ev, move_gap) + + # Step 2: Move and set the DCM motors. + move_dcm_motors(energy_ev) + + # Step 3: Update the mirror stripe. + set_mirror_stripe(energy_ev) + + # Step 4: Perform DCM pitch scan and move to peak. + if mono_scan: + if plot: + mono_pitch_scan(plot=True) + else: + mono_pitch_scan(plot=False) diff --git a/pxii_bec/macros/pxii_parameters.py b/pxii_bec/macros/pxii_parameters.py new file mode 100755 index 0000000..a627783 --- /dev/null +++ b/pxii_bec/macros/pxii_parameters.py @@ -0,0 +1,256 @@ +"""File to store beamline parameters and defaults""" + +from dataclasses import dataclass +import numpy as np + + +@dataclass(frozen=True) +class FitDefaults: + """Default values for fitting routines""" + + # Constants for default models, baselines, and parameters + MODEL = "Voigt" + BASELINE = "Linear" + SETTLE_TIME = 0.1 + RELATIVE_MODE = True + + +@dataclass(frozen=True) +class EnergyDefaults: + """Parameters for PXII energy changes""" + + min_energy_change = 1 + min_energy_ev = 4800 + max_energy_ev = 30002 + beam_offset = 6 + signals = { + "sig1": dev.lu_bpmsum, + "sig2": dev.bsc_bpmsum, + "sig3": dev.bcu_bpmsum, + } + energy = dev.dcm_bragg + mono_pitch = dev.dcm_pitch + mono_perp = dev.dcm_perp + mono_roll = dev.dcm_froll + mono_roll_value = 4.65 + LUT_table = "luts/energy_lut.csv" + stripe_thresholds = {"silicon": 9000, "rhodium": 20000, "platinum": 40000} + pitch_scan = {"halfwidth": 0.075, "steps": 20} + + +@dataclass(frozen=True) +class Calibration: + """Calibration parameters for PXII optics""" + + pitch = np.array([4.61823701e-14, -1.97330772e-09, 2.89694543e-05, -5.34468669e00]) + roll = np.array([2.28291039e-03, -2.41928101e01]) + + +@dataclass(frozen=True) +class Gap: + harmonics = { + "H3": np.array([9.15e-04, 4.49e-01]), + "H5": np.array([5.19e-04, 7.149e-01]), + "H7": np.array([3.57694643e-04, 8.73775476e-01]), + "H9": np.array([2.76335714e-04, 8.98471905e-01]), + "H11": np.array([2.2225e-04, 9.582e-01]), + "H13": np.array([1.9e-4, 9.262e-01]), + "H15": np.array([1.67e-4, 8.83e-01]), + } + # Define harmonic ranges as a constant + harmonic_ranges = { + "H3": (4900, 7000), + "H5": (7000, 10000), + "H7": (10000, 13000), + "H9": (13000, 16000), + "H11": (16000, 19000), + "H13": (19000, 22000), + "H15": (22000, float("inf")), + } + + @staticmethod + def get_harmonic_by_energy(energy_ev: float): + """ + Determines the harmonic key based on the provided energy. + + Args: + energy_ev (float): The energy value (eV). + + Returns: + Optional[str]: The harmonic name (e.g., 'H3', 'H7') if the range matches, None otherwise. + """ + for harmonic, (low, high) in Gap.harmonic_ranges.items(): + if low < energy_ev <= high: + return harmonic + return None + + def get_harmonic_values(self, energy_ev: float): + """ + Retrieves the harmonic array based on the energy value. + + Args: + energy_ev (float): The energy value (eV). + + Returns: + Optional[np.array]: The corresponding array of harmonic values if the range matches, None otherwise. + """ + harmonic = self.get_harmonic_by_energy(energy_ev) + return self.harmonics.get(harmonic) if harmonic else None + + +@dataclass(frozen=True) +class Harmonics: + """Anuschka's harmonics data for PXII U19 Undulator""" + + min_gap_value = 4.5 + map = { + 3: np.array([1.07141725, -0.52834258]), + 4: np.array([0.007111, -0.13678409, 1.52295567, -1.06882785]), + 5: np.array([0.00315051, -0.0766359, 1.13107469, -0.86442062]), + 6: np.array([0.00162862, -0.04452336, 0.82948259, -0.37329674]), + 7: np.array([0.00080764, -0.02418882, 0.59212947, 0.15702886]), + 8: np.array([1.16699242e-03, -4.68207543e-02, 9.43972396e-01, -1.94201019e00]), + 9: np.array([0.00048419, -0.02010648, 0.55514114, -0.38180067]), + 10: np.array([2.11583333e-04, -8.03503571e-03, 3.43141595e-01, 6.02318000e-01]), + 11: np.array([0.00334392, -0.18405336, 3.58338994, -19.3640241]), + 12: np.array([3.20520798e-05, 2.39253145e-03, 8.09198503e-02, 2.22897377e00]), + 13: np.array([0.00278744, 0.07979874, 2.05143916]), + } + energy_ranges = { + 3: (0, 7), + 5: (7, 10), + 7: (10, 13), + 9: (13, 16), + 11: (16, 19), + 13: (19, 22), + } + high_energy = [(15, (23, 25)), (17, (25, 29)), (19, (29, float("inf")))] + + +@dataclass(frozen=True) +class CamConversion: + """Convert pixels to microns for sam cam""" + + a = 0.5208 + b = 0.002586 + + +@dataclass(frozen=True) +class BPMScans: + """Define the names of the motors and bpm channels""" + + fe = { + "x_name": dev.fe_bpm_x.name, + "y_name": dev.fe_bpm_y.name, + "z1_name": dev.fe_bpm1.name, + "z2_name": dev.fe_bpm2.name, + "z3_name": dev.fe_bpm3.name, + "z4_name": dev.fe_bpm3.name, + "z5_name": dev.fe_bpmsum.name, + "x_device": dev.fe_bpm_x, + "y_device": dev.fe_bpm_y, + } + lu = { + "x_name": dev.lu_bpm_x.name, + "y_name": dev.lu_bpm_y.name, + "z1_name": dev.lu_bpm1.name, + "z2_name": dev.lu_bpm2.name, + "z3_name": dev.lu_bpm3.name, + "z4_name": dev.lu_bpm4.name, + "z5_name": dev.lu_bpmsum.name, + "x_device": dev.lu_bpm_x, + "y_device": dev.lu_bpm_y, + } + bsc = { + "x_name": dev.bsc_bpm_x.name, + "y_name": dev.bsc_bpm_y.name, + "z1_name": dev.bsc_bpm1.name, + "z2_name": dev.bsc_bpm2.name, + "z3_name": dev.bsc_bpm3.name, + "z4_name": dev.bsc_bpm4.name, + "z5_name": dev.bsc_bpmsum.name, + "x_device": dev.bsc_bpm_x, + "y_device": dev.bsc_bpm_y, + } + bcu = { + "x_name": dev.bcu_bpm_x.name, + "y_name": dev.bcu_bpm_y.name, + "z1_name": dev.bcu_bpm1.name, + "z2_name": dev.bcu_bpm2.name, + "z3_name": dev.bcu_bpm3.name, + "z4_name": dev.bcu_bpm4.name, + "z5_name": dev.bcu_bpmsum.name, + "x_device": dev.bcu_bpm_x, + "y_device": dev.bcu_bpm_y, + } + + +@dataclass(frozen=True) +class MirrorConfig: + """Define the names of the mirror channels""" + + hfm = { + "bu_name": dev.hfm_bu.name, + "bd_name": dev.hfm_bd.name, + "z_name": dev.samcam_xsig.name, + "x_device": dev.hfm_bu, + "y_device": dev.hfm_bd, + } + vfm = { + "bu_name": dev.vfm_bu.name, + "bd_name": dev.vfm_bd.name, + "z_name": dev.samcam_ysig.name, + "x_device": dev.vfm_bu, + "y_device": dev.vfm_bd, + } + +from typing import Callable +@dataclass +class Target: + inpos: float + outpos: float + tol: float + mot: str + reader: Callable[[], float] + + @property + def actual(self): + return self.reader() + + def checkpos(self): + return abs(self.actual - self.inpos) <= self.tol + + def mvin(self): + umv(self.mot, self.inpos) + + def mvout(self): + umv(self.mot, self.outpos) + + +@dataclass +class GroupTarget: + def __init__(self, **targets: Target): + self.targets = targets + def checkpos(self): + return all(t.checkpos() for t in self.targets.values()) + def report(self): + return {name: t.checkpos() for name, t in self.targets.items()} + + +@dataclass(frozen=True) +class SE: + """Define settings for scintillator, collimator, i1""" + scin = Target(38.6, 20.0, 0.1, dev.scin_y, lambda: dev.scin_y.read()['scin_y']['value']) + i1 = Target(44.0, 20.0, 0.2, dev.scin_y, lambda: dev.scin_y.read()['scin_y']['value']) + colly = Target(41.5, 20.0, 0.05, dev.coll_y, lambda: dev.coll_y.read()['coll_y']['value']) + bsy = Target(0.1, -0.9, 0.05, dev.bs_y, lambda: dev.bs_y.read()['bs_y']['value']) + bsx = Target(2.45, 2.45, 0.05, dev.bs_x, lambda: dev.bs_x.read()['bs_x']['value']) + # coll = GroupTarget( + # x = Target(0.0517, 0.0517, 0.02, dev.coll_x, lambda: dev.coll_x.read()['coll_x']['value']), + # y = Target(41.5, 20.0, 0.05, dev.coll_y, lambda: dev.coll_y.read()['coll_y']['value']), + # ) + # bs = GroupTarget( + # x = Target(2.65, 2.65, 0.05, dev.bs_x, lambda: dev.bs_x.read()['bs_x']['value']), + # y = Target(0.1, 0.1, 0.05, dev.bs_y, lambda: dev.bs_y.read()['bs_y']['value']) + # ) + \ No newline at end of file