import time from time import sleep import numpy as np from epics import PV from logzero import logger as log from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable from slic.core.scanner.scanbackend import wait_for_all # , stop_all import subprocess UND_NAME_FMT = "SARUN{:02}-UIND030" N_UND_CHIC = None N_UNDS = list(range(3, 15 + 1)) # N_UNDS.remove(N_UND_CHIC) # does not exist # from Alvra mono calibration # energy_offset = 20.37839 # Cristallina without calibration # offset is the difference between PSSS and undulator setpoint # sign convention: Undulator - PSSS energy_offset = 0 # eV # PSSS = 8355 eV, machine = 8253 eV # move the PSSS motor according to the energy # TODO: improve this hack PSSS_MOVE = True def set_PSSS_energy(energy: float): """When scanning the energy with the undulator we adjust the spectrometer to follow. """ energy = energy - energy_offset print(f"Adjusting PSSS to {energy}") PSSS_energy_PV_name = "SARFE10-PSSS059:ENERGY" PSSS_energy_PV = PV(PSSS_energy_PV_name) PSSS_energy_PV.put(energy, wait=True) ret = subprocess.run(["python", "/sf/photo/src/PSSS_motor/qt/PSSS_motion.py", "-s", "-m5", "SARFE10-PSSS059"]) sleep(2) if ret.returncode != 0: log.warning("WARNING: PSSS adjustment failed.") else: print("Finished adjusting PSSS.") class Undulators(Adjustable): """ for n_und_ref=None (default), the reference undulator currently used by the machine will be used """ def __init__( self, n_unds=N_UNDS, n_und_ref=None, scaled=True, ID="ARAMIS_UNDULATORS", name="Aramis Undulators", units="eV" ): # # don't allow setting these since there's no chic :) # chic_fudge_offset = 0 # adjust_chic = False super().__init__(ID, name=name, units=units) machine_n_und_ref = get_machine_n_und_ref() if n_und_ref is None: if machine_n_und_ref is None: raise ValueError( f"could not read reference undulator currently used by the machine, please specify n_und_ref" ) n_und_ref = machine_n_und_ref if n_und_ref != machine_n_und_ref: log.warning( f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})" ) n_unds = list(n_unds) if N_UND_CHIC in n_unds: log.warning( f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed" ) n_unds.remove(N_UND_CHIC) if n_und_ref not in n_unds: raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}") self.n_unds = n_unds self.n_und_ref = n_und_ref self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds] self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref) self.adjs = {name: Undulator(name) for name in und_names} # self.chic = CHIC(chic_fudge_offset, name, units) # self.phases = Phases(n_unds) # self.adjust_chic = adjust_chic self.scaled = scaled self.convert = ConverterEK() a = self.adjs[und_name_cal] self.scale = ScalerEK(a) def set_target_value(self, value, hold=False): value = value + energy_offset k = self.convert.K(value) if np.isnan(k): print("K is nan for", value) return print(f"{k} <- {value}") ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()] if self.scaled: header = "scaled: " ks_target = self.scale.K(value, ks_current) else: header = "all equal:" ks_target = k * np.ones_like(ks_current) print(header, ks_target) print() def change(): # TODO: replace by set_all_target_values_and_wait when print not needed anymore tasks = [] for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target): delta = k_old - k_new print(f"{name}: {k_old}\t->\t{k_new}\t({delta})") if np.isnan(k_new): print(f"{name} skipped since target K is nan") continue t = a.set_target_value(k_new, hold=False) tasks.append(t) wait_for_all(tasks) # # make sure new K values have been written TODO: needed? # sleep(2) # check if this can be shortened back to 0.5 # # switching on radial motors ... # wait_for_all([ # a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values() # ]) # # press a few times # for _ in range(3): # # ... and pushing go to ensure proper movements # wait_for_all([ # a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values() # ]) # sleep(0.2) # # make sure the undulators finished moving TODO: needed? # sleep(5) # if self.adjust_chic: # print("CHIC adjustment follows") ## self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed # self.phases.set(value) # print("CHIC adjustment done") # else: # print("CHIC adjustment skipped") # make sure the undulators and phases finished moving TODO: needed? sleep(5) if not PSSS_MOVE: print("no PSSS movement enabled") else: set_PSSS_energy(value) return self._as_task(change, hold=hold) def get_current_value(self): n = self.und_name_cal a = self.adjs[n] k = a.get_current_value() energy = self.convert.E(k) - energy_offset # all_ks = [a.get_current_value() for a in self.adjs.values()] # checks = np.isclose(all_ks, k, rtol=0, atol=0.001) # if not all(checks): # print(f"Warning: Ks are not all close to {k}:") # for name, k, chk in zip(self.adjs.keys(), all_ks, checks): # if not chk: # print(name, k) return energy # if we need to fudge the number to match the mono, do it here! def is_moving(self): return any(a.is_moving() for a in self.adjs) class Undulator(PVAdjustable): def __init__(self, name, accuracy=0.0005): pvname_setvalue = name + ":K_SET" pvname_readback = pvname_setvalue # name + ":K_READ" #TODO: there are no readback values? super().__init__( pvname_setvalue, pvname_readback=pvname_readback, accuracy=accuracy, active_move=True, name=name, internal=True, ) self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True) # self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON", internal=True) #TODO: do not exist # self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO", internal=True) #TODO: do not exist # self.adj_radial_on_proc = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True) # self.adj_radial_go_proc = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True) @property def energy(self): return self.adj_energy.get_current_value() * 1000 class ConverterEK: h = 4.135667696e-15 # eV * s c = 299792458 # m / s lambda_u = 15e-3 # m const = 2 * h * c / lambda_u # eV electron_rest_energy = 0.51099895 # MeV # was: pvname_electron_energy="SATCL01-MBND100:P-READ" # was: pvname_electron_energy="SATCB01:ENE-FILT-OP" def __init__(self, pvname_electron_energy="SARCL02-MBND100:P-READ"): # S30CB13:ENE-FILT-OP self.pv_electron_energy = PV(pvname_electron_energy) def K(self, energy): f = self.get_factor() v = f / energy - 1 return np.sqrt(2 * v) def E(self, k_value): f = self.get_factor() v = 1 + k_value**2 / 2 return f / v def get_factor(self): return self.const * self.get_gamma_squared() def get_gamma_squared(self): electron_energy = self.pv_electron_energy.get() gamma = electron_energy / self.electron_rest_energy return gamma**2 class ScalerEK: def __init__(self, und_reference): self.und = und_reference def K(self, energy_target, K_current=None): if K_current is None: K_current = self.und.get_current_value() K_current = np.asarray(K_current) energy_current = self.und.energy energy_ratio = energy_current / energy_target K_target_squared = energy_ratio * (K_current**2 + 2) - 2 return np.sqrt(K_target_squared) # class CHIC(PVAdjustable): # # def __init__(self, fudge_offset, name, units): # self.fudge_offset = fudge_offset # name += " CHIC Energy" # super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name) # self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE") # self.units = units # # # def set_target_value(self, value, hold=False): # fudge_offset = self.fudge_offset # print("CHIC fudge offset is", fudge_offset) # value -= fudge_offset # value /= 1000 # # def change(): # sleep(1) # print("CHIC setvalue") # self.pvs.setvalue.put(value, wait=True) # print("CHIC start") # self.pvs.start.put(1, wait=True) # #TODO: test whether an additional sleep is needed # sleep(1) # # return self._as_task(change, hold=hold) # # # def get_current_value(self): # return super().get_current_value() * 1000 # class TwoColorChicane(PVAdjustable): # # def __init__(self, name):#, t0=0): ## self.t0 = t0 ## name += " Two Color Chicane" # super().__init__("SATUN14-MBND100:I-SET", "SATUN14-MBND100:I-READ", process_time=1, name=name) # ## def set_target_value(self, value, hold=False): ## super().set_target_value(value) # ## def get_current_value(self): ## return super().get_current_value() # class Phases: # # def __init__(self, n_unds=N_UNDS): # # 22 does not have a chicane # n_unds = n_unds.copy() # if 22 in n_unds: # n_unds.remove(22) # # # 22 does not have a chicane # n_unds_all = N_UNDS.copy() # if 22 in n_unds_all: # n_unds_all.remove(22) # # self.cb_auto_good = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in n_unds} # self.cb_auto_bad = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in set(n_unds_all) - set(n_unds)} # # self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY") # self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY") # # # def set(self, energy): # for cb in self.cb_auto_good.values(): cb.put(int(False)) # for cb in self.cb_auto_bad.values(): cb.put(int(False)) # sleep(0.1) # # pv_fixed_energy = self.pv_fixed_energy # # current_state = pv_fixed_energy.get() # pv_fixed_energy.put(int(True)) # enforce fixed energy # # self.pv_energy.put(energy / 1000) # in keV # sleep(0.1) # # for cb in self.cb_auto_good.values(): cb.put(int(True)) # sleep(0.1) # # for cb in self.cb_auto_good.values(): cb.put(int(False)) # for cb in self.cb_auto_bad.values(): cb.put(int(False)) # sleep(0.1) # ## pv_fixed_energy.put(current_state) # reset to original state # class Phases: # def __init__(self, n_unds=N_UNDS): # # 22 does not have a chicane # n_unds = n_unds.copy() # if 22 in n_unds: # n_unds.remove(22) # self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY") # self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY") # self.adjs_apply = {i: PVAdjustable(f"SATUN{i:02}-CHIC:APPLY-DOP", internal=True) for i in n_unds} # def set(self, energy): # pv_energy = self.pv_energy # pv_fixed_energy = self.pv_fixed_energy # current_state = pv_fixed_energy.get() # pv_fixed_energy.put(int(True)) # enforce fixed energy # pv_energy.put(energy / 1000) # in keV # sleep(0.1) # self.update() ## sleep(10) # pv_fixed_energy.put(current_state) # reset to original state # def update(self): # wait_for_all([ # adj_apply.set_target_value(1) for adj_apply in self.adjs_apply.values() # ]) def get_machine_n_und_ref(): for attempt in range(3): try: res = PVEnumAdjustable("SARUN:REF-UND").get() if not res.startswith("SARUN"): return None except AttributeError as e: print("Error getting undulator, retrying") print(e) time.sleep(1) else: break n = len("SARUN") res = res[n:] try: res = int(res) except ValueError: return None return res