From 17d28531d4b0c65f5c0a6af995cd1b2e39c90ae2 Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Tue, 1 Mar 2022 11:07:15 +0100 Subject: [PATCH] first working version for aramis undulators --- undulator.py | 381 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 381 insertions(+) create mode 100644 undulator.py diff --git a/undulator.py b/undulator.py new file mode 100644 index 0000000..2f82646 --- /dev/null +++ b/undulator.py @@ -0,0 +1,381 @@ +from time import sleep +import numpy as np +from epics import PV + +from logzero import logger as log + +from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable +from slic.core.scanner.scanbackend import wait_for_all #, stop_all + + +UND_NAME_FMT = "SARUN{:02}-UIND030" +N_UND_CHIC = None + +N_UNDS = list(range(3, 15+1)) +#N_UNDS.remove(N_UND_CHIC) # does not exist + +energy_offset = 20.37839 + +class Undulators(Adjustable): + """ + for n_und_ref=None (default), the reference undulator currently used by the machine will be used + """ + + def __init__(self, n_unds=N_UNDS, n_und_ref=None, scaled=True, ID="ARAMIS_UNDULATORS", name="Aramis Undulators", units="eV"): +# # don't allow setting these since there's no chic :) +# chic_fudge_offset = 0 +# adjust_chic = False + + super().__init__(ID, name=name, units=units) + + + machine_n_und_ref = get_machine_n_und_ref() + + if n_und_ref is None: + if machine_n_und_ref is None: + raise ValueError(f"could not read reference undulator currently used by the machine, please specify n_und_ref") + n_und_ref = machine_n_und_ref + + if n_und_ref != machine_n_und_ref: + log.warning(f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})") + + + n_unds = list(n_unds) + + if N_UND_CHIC in n_unds: + log.warning(f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed") + n_unds.remove(N_UND_CHIC) + + if n_und_ref not in n_unds: + raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}") + + + self.n_unds = n_unds + self.n_und_ref = n_und_ref + + self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds] + self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref) + + self.adjs = {name: Undulator(name) for name in und_names} +# self.chic = CHIC(chic_fudge_offset, name, units) +# self.phases = Phases(n_unds) + + +# self.adjust_chic = adjust_chic + self.scaled = scaled + + self.convert = ConverterEK() + + a = self.adjs[und_name_cal] + self.scale = ScalerEK(a) + + + + def set_target_value(self, value, hold=False): + value = value + energy_offset + k = self.convert.K(value) + if np.isnan(k): + print("K is nan for", value) + return + print(f"{k} <- {value}") + + ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()] + + if self.scaled: + header = "scaled: " + ks_target = self.scale.K(value, ks_current) + else: + header = "all equal:" + ks_target = k * np.ones_like(ks_current) + + print(header, ks_target) + print() + + def change(): + #TODO: replace by set_all_target_values_and_wait when print not needed anymore + tasks = [] + for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target): + delta = k_old - k_new + print(f"{name}: {k_old}\t->\t{k_new}\t({delta})") + if np.isnan(k_new): + print(f"{name} skipped since target K is nan") + continue + t = a.set_target_value(k_new, hold=False) + tasks.append(t) + wait_for_all(tasks) + +# # make sure new K values have been written TODO: needed? +# sleep(2) # check if this can be shortened back to 0.5 + +# # switching on radial motors ... +# wait_for_all([ +# a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values() +# ]) + +# # press a few times +# for _ in range(3): +# # ... and pushing go to ensure proper movements +# wait_for_all([ +# a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values() +# ]) +# sleep(0.2) + +# # make sure the undulators finished moving TODO: needed? +# sleep(5) + +# if self.adjust_chic: +# print("CHIC adjustment follows") +## self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed +# self.phases.set(value) +# print("CHIC adjustment done") +# else: +# print("CHIC adjustment skipped") + + # make sure the undulators and phases finished moving TODO: needed? + sleep(5) + + return self._as_task(change, hold=hold) + + + def get_current_value(self): + n = self.und_name_cal + a = self.adjs[n] + k = a.get_current_value() + energy = self.convert.E(k) - energy_offset + +# all_ks = [a.get_current_value() for a in self.adjs.values()] +# checks = np.isclose(all_ks, k, rtol=0, atol=0.001) +# if not all(checks): +# print(f"Warning: Ks are not all close to {k}:") +# for name, k, chk in zip(self.adjs.keys(), all_ks, checks): +# if not chk: +# print(name, k) + + return energy # if we need to fudge the number to match the mono, do it here! + + + def is_moving(self): + return any(a.is_moving() for a in self.adjs) + + + +class Undulator(PVAdjustable): + + def __init__(self, name, accuracy=0.0005): + pvname_setvalue = name + ":K_SET" + pvname_readback = pvname_setvalue # name + ":K_READ" #TODO: there are no readback values? + super().__init__(pvname_setvalue, pvname_readback=pvname_readback, accuracy=accuracy, active_move=True, name=name, internal=True) + self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True) +# self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON", internal=True) #TODO: do not exist +# self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO", internal=True) #TODO: do not exist +# self.adj_radial_on_proc = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True) +# self.adj_radial_go_proc = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True) + + @property + def energy(self): + return self.adj_energy.get_current_value() * 1000 + + + +class ConverterEK: + + h = 4.135667696e-15 # eV * s + c = 299792458 # m / s + lambda_u = 15e-3 # m + const = 2 * h * c / lambda_u # eV + + electron_rest_energy = 0.51099895 # MeV + + + # was: pvname_electron_energy="SATCL01-MBND100:P-READ" + # was: pvname_electron_energy="SATCB01:ENE-FILT-OP" + def __init__(self, pvname_electron_energy="SARCL02-MBND100:P-READ"): #S30CB13:ENE-FILT-OP + self.pv_electron_energy = PV(pvname_electron_energy) + + def K(self, energy): + f = self.get_factor() + v = f / energy - 1 + return np.sqrt(2 * v) + + def E(self, k_value): + f = self.get_factor() + v = 1 + k_value**2 / 2 + return f / v + + def get_factor(self): + return self.const * self.get_gamma_squared() + + def get_gamma_squared(self): + electron_energy = self.pv_electron_energy.get() + gamma = electron_energy / self.electron_rest_energy + return gamma**2 + + + +class ScalerEK: + + def __init__(self, und_reference): + self.und = und_reference + + def K(self, energy_target, K_current=None): + if K_current is None: + K_current = self.und.get_current_value() + K_current = np.asarray(K_current) + energy_current = self.und.energy + energy_ratio = energy_current / energy_target + K_target_squared = energy_ratio * (K_current**2 + 2) - 2 + return np.sqrt(K_target_squared) + + + +#class CHIC(PVAdjustable): +# +# def __init__(self, fudge_offset, name, units): +# self.fudge_offset = fudge_offset +# name += " CHIC Energy" +# super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name) +# self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE") +# self.units = units +# +# +# def set_target_value(self, value, hold=False): +# fudge_offset = self.fudge_offset +# print("CHIC fudge offset is", fudge_offset) +# value -= fudge_offset +# value /= 1000 +# +# def change(): +# sleep(1) +# print("CHIC setvalue") +# self.pvs.setvalue.put(value, wait=True) +# print("CHIC start") +# self.pvs.start.put(1, wait=True) +# #TODO: test whether an additional sleep is needed +# sleep(1) +# +# return self._as_task(change, hold=hold) +# +# +# def get_current_value(self): +# return super().get_current_value() * 1000 + + + + + +#class TwoColorChicane(PVAdjustable): +# +# def __init__(self, name):#, t0=0): +## self.t0 = t0 +## name += " Two Color Chicane" +# super().__init__("SATUN14-MBND100:I-SET", "SATUN14-MBND100:I-READ", process_time=1, name=name) +# +## def set_target_value(self, value, hold=False): +## super().set_target_value(value) +# +## def get_current_value(self): +## return super().get_current_value() + + + + + +#class Phases: +# +# def __init__(self, n_unds=N_UNDS): +# # 22 does not have a chicane +# n_unds = n_unds.copy() +# if 22 in n_unds: +# n_unds.remove(22) +# +# # 22 does not have a chicane +# n_unds_all = N_UNDS.copy() +# if 22 in n_unds_all: +# n_unds_all.remove(22) +# +# self.cb_auto_good = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in n_unds} +# self.cb_auto_bad = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in set(n_unds_all) - set(n_unds)} +# +# self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY") +# self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY") +# +# +# def set(self, energy): +# for cb in self.cb_auto_good.values(): cb.put(int(False)) +# for cb in self.cb_auto_bad.values(): cb.put(int(False)) +# sleep(0.1) +# +# pv_fixed_energy = self.pv_fixed_energy +# +# current_state = pv_fixed_energy.get() +# pv_fixed_energy.put(int(True)) # enforce fixed energy +# +# self.pv_energy.put(energy / 1000) # in keV +# sleep(0.1) +# +# for cb in self.cb_auto_good.values(): cb.put(int(True)) +# sleep(0.1) +# +# for cb in self.cb_auto_good.values(): cb.put(int(False)) +# for cb in self.cb_auto_bad.values(): cb.put(int(False)) +# sleep(0.1) +# +## pv_fixed_energy.put(current_state) # reset to original state + + + + + +#class Phases: + +# def __init__(self, n_unds=N_UNDS): +# # 22 does not have a chicane +# n_unds = n_unds.copy() +# if 22 in n_unds: +# n_unds.remove(22) +# self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY") +# self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY") +# self.adjs_apply = {i: PVAdjustable(f"SATUN{i:02}-CHIC:APPLY-DOP", internal=True) for i in n_unds} + + +# def set(self, energy): +# pv_energy = self.pv_energy +# pv_fixed_energy = self.pv_fixed_energy + +# current_state = pv_fixed_energy.get() +# pv_fixed_energy.put(int(True)) # enforce fixed energy + +# pv_energy.put(energy / 1000) # in keV + +# sleep(0.1) +# self.update() +## sleep(10) + +# pv_fixed_energy.put(current_state) # reset to original state + + +# def update(self): +# wait_for_all([ +# adj_apply.set_target_value(1) for adj_apply in self.adjs_apply.values() +# ]) + + + + + +def get_machine_n_und_ref(): + res = PVEnumAdjustable("SARUN:REF-UND").get() + if not res.startswith("SARUN"): + return None + n = len("SARUN") + res = res[n:] + try: + res = int(res) + except ValueError: + return None + return res + + + + +