from time import sleep from datetime import datetime import numpy as np from epics import PV from logzero import logger as log from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable from slic.core.scanner.scanbackend import wait_for_all #, stop_all from slic.utils import cprint, tqdm_sleep from .delay_current import DelayCurrentConverter UND_NAME_FMT = "SATUN{:02}-UIND030" N_UND_CHIC = 14 N_UNDS = list(range(6, 22+1)) N_UNDS.remove(N_UND_CHIC) class Undulators(Adjustable): """ for n_und_ref=None (default), the reference undulator currently used by the machine will be used """ def __init__(self, n_unds=N_UNDS, n_und_ref=None, chic_fudge_offset=0, adjust_chic=False, scaled=True, ID="ATHOS_UNDULATORS", name="Athos Undulators", units="eV"): super().__init__(ID, name=name, units=units) machine_n_und_ref = get_machine_n_und_ref() if n_und_ref is None: if machine_n_und_ref is None: raise ValueError(f"could not read reference undulator currently used by the machine, please specify n_und_ref") n_und_ref = machine_n_und_ref if n_und_ref != machine_n_und_ref: log.warning(f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})", stacklevel=3) n_unds = list(n_unds) if N_UND_CHIC in n_unds: log.warning(f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed", stacklevel=3) n_unds.remove(N_UND_CHIC) if n_und_ref not in n_unds: raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}") self.n_unds = n_unds self.n_und_ref = n_und_ref self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds] self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref) self.adjs = {name: Undulator(name) for name in und_names} # self.chic = CHIC(chic_fudge_offset, name, units) self.phases = Phases(n_unds) self.adjust_chic = adjust_chic self.scaled = scaled self.convert = ConverterEK() a = self.adjs[und_name_cal] self.scale = ScalerEK(a) def set_target_value(self, value, hold=False): k = self.convert.K(value) if np.isnan(k): print("K is nan for", value) return print(f"{k} <- {value}") ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()] if self.scaled: header = "scaled: " ks_target = self.scale.K(value, ks_current) else: header = "all equal:" ks_target = k * np.ones_like(ks_current) print(header, ks_target) print() def change(): #TODO: replace by set_all_target_values_and_wait when print not needed anymore tasks = {} for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target): delta = k_old - k_new print(f"{name}: {k_old}\t->\t{k_new}\t({delta})") if np.isnan(k_new): print(f"{name} skipped since target K is nan") continue t = a.set_target_value(k_new, hold=False) tasks[name] = t # wait_for_all(tasks) for n, t in tasks.items(): print("waiting for ", n) t.wait() print("target reached ", n) print("starting radial motors") # make sure new K values have been written TODO: needed? sleep(2) # check if this can be shortened back to 0.5 # switching on radial motors ... wait_for_all([ a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values() ]) # press a few times for _ in range(3): # ... and pushing go to ensure proper movements wait_for_all([ a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values() ]) sleep(0.2) # # make sure the undulators finished moving TODO: needed? # sleep(5) if self.adjust_chic: print("CHIC adjustment follows") # self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed self.phases.set(value) print("CHIC adjustment done") else: print("CHIC adjustment skipped") # make sure the undulators and phases finished moving TODO: needed? sleep(7) return self._as_task(change, hold=hold) def get_current_value(self): n = self.und_name_cal a = self.adjs[n] k = a.get_current_value() energy = self.convert.E(k) # all_ks = [a.get_current_value() for a in self.adjs.values()] # checks = np.isclose(all_ks, k, rtol=0, atol=0.001) # if not all(checks): # print(f"Warning: Ks are not all close to {k}:") # for name, k, chk in zip(self.adjs.keys(), all_ks, checks): # if not chk: # print(name, k) return energy def is_moving(self): return any(a.is_moving() for a in self.adjs) def unstuck(self): for a in self.adjs.values(): a.unstuck() class Undulator(PVAdjustable): def __init__(self, name, accuracy=0.4): pvname_setvalue = name + ":K_SET" pvname_readback = name + ":K_READ" super().__init__(pvname_setvalue, pvname_readback=pvname_readback, accuracy=accuracy, active_move=True, name=name, internal=True) self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True) self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON", internal=True) self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO", internal=True) # self.adj_radial_on_proc = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True) # self.adj_radial_go_proc = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True) def set_target_value(self, value): self.wait_for_no_alarm() if self.pvs.readback.status != 0: self.unstuck() return super().set_target_value(value) def wait_for_no_alarm(self): for i in range(10): if self.pvs.readback.status == 0: break if i == 0: print(self.name, datetime.now()) print(self.name, f"seems stuck ({i}) 💩") sleep(0.5) def unstuck(self, delta=0.0001): if self.pvs.readback.status == 0: print(self.name, "is not stuck") return current = self.pvs.setvalue.get() temp = current + delta self._set_set_value(temp) self._set_set_value(current) if self.pvs.readback.status == 0: print(self.name, "is not stuck anymore 😍") else: print(self.name, "is still stuck 😭") def _set_set_value(self, value): self.pvs.setvalue.put(value) sleep(0.1) self.adj_radial_on.set_target_value(0) sleep(0.5) self.adj_radial_on.set_target_value(1) sleep(0.5) self.adj_radial_go.set_target_value(1) sleep(3) @property def energy(self): return self.adj_energy.get_current_value() * 1000 class ConverterEK: h = 4.135667696e-15 # eV * s c = 299792458 # m / s lambda_u = 38e-3 # m const = 2 * h * c / lambda_u # eV electron_rest_energy = 0.51099895 # MeV # was: pvname_electron_energy="SATCL01-MBND100:P-READ" def __init__(self, pvname_electron_energy="SATCB01:ENE-FILT-OP"): self.pv_electron_energy = PV(pvname_electron_energy) def K(self, energy): f = self.get_factor() v = f / energy - 1 return np.sqrt(2 * v) def E(self, k_value): f = self.get_factor() v = 1 + k_value**2 / 2 return f / v def get_factor(self): return self.const * self.get_gamma_squared() def get_gamma_squared(self): electron_energy = self.pv_electron_energy.get() gamma = electron_energy / self.electron_rest_energy return gamma**2 class ScalerEK: def __init__(self, und_reference): self.und = und_reference def K(self, energy_target, K_current=None): if K_current is None: K_current = self.und.get_current_value() K_current = np.asarray(K_current) energy_current = self.und.energy energy_ratio = energy_current / energy_target K_target_squared = energy_ratio * (K_current**2 + 2) - 2 return np.sqrt(K_target_squared) class CHIC(PVAdjustable): def __init__(self, fudge_offset, name, units): self.fudge_offset = fudge_offset name += " CHIC Energy" super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name) self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE") self.units = units def set_target_value(self, value, hold=False): fudge_offset = self.fudge_offset print("CHIC fudge offset is", fudge_offset) value -= fudge_offset value /= 1000 def change(): sleep(1) print("CHIC setvalue") self.pvs.setvalue.put(value, wait=True) print("CHIC start") self.pvs.start.put(1, wait=True) #TODO: test whether an additional sleep is needed sleep(1) return self._as_task(change, hold=hold) def get_current_value(self): return super().get_current_value() * 1000 class TwoColorChicaneCurrent(PVAdjustable): def __init__( self, pvname_setvalue="SATUN14-MBND100:I-SET", pvname_readback="SATUN14-MBND100:I-READ", name="Two Color Chicane as current", accuracy=0.1, process_time=1, hysteresis_protection=True, **kwargs ): super().__init__( pvname_setvalue, pvname_readback, name=name, accuracy=accuracy, process_time=process_time, **kwargs ) self.hysteresis_protection = hysteresis_protection self.cycle_time = 250 self.pv_cycle = PV("SATUN14-MBND100:CYCLE") self.previous_target = None def set_target_value(self, value, **kwargs): current = self.get_current_value() if value < current: cprint(f"Hysteresis Warning: target value {value} is smaller than the current value {current}", color="red") if self.hysteresis_protection and value == self.previous_target: cprint(f"target value {value} is identical to previous target value, will not try to change", color="cyan") return self.previous_target = value super().set_target_value(value, **kwargs) def cycle_magnet(self): self.pv_cycle.put(1, wait=True) tqdm_sleep(self.cycle_time) # set the current to 1 and 2 consecutively, # since the results after the first step always looks strange self.set_target_value(1).wait() sleep(1) self.set_target_value(2).wait() sleep(1) class TwoColorChicaneDelay(Adjustable): def __init__(self, ID="TWO-COLOR-CHICANE-DELAY", name="Two Color Chicane as delay", pvname_electron_energy="SATCB01:ENE-FILT-OP", hysteresis_protection=True, units="fs", **kwargs): self.adj_current = TwoColorChicaneCurrent(hysteresis_protection=hysteresis_protection, internal=True) self.pv_electron_energy = PV(pvname_electron_energy) self.converter = DelayCurrentConverter() super().__init__(ID, name=name, units=units, **kwargs) def get_current_value(self): current = self.adj_current.get_current_value() delay = self.calc_delay_fs(current) return delay def set_target_value(self, delay): current = self.calc_current_A(delay) print(f"{current} A <- {delay} fs") self.adj_current.set_target_value(current).wait() def is_moving(self): return self.adj_current.is_moving() def cycle_magnet(self): self.adj_current.cycle_magnet() def calc_current_A(self, delay_fs): electron_energy = self.pv_electron_energy.get() return self.converter.current_A(delay_fs, electron_energy) def calc_delay_fs(self, current_A): electron_energy = self.pv_electron_energy.get() return self.converter.delay_fs(current_A, electron_energy) class Phases: def __init__(self, n_unds=N_UNDS): # 22 does not have a chicane n_unds = n_unds.copy() if 22 in n_unds: n_unds.remove(22) # 22 does not have a chicane n_unds_all = N_UNDS.copy() if 22 in n_unds_all: n_unds_all.remove(22) self.cb_auto_good = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in n_unds} self.cb_auto_bad = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in set(n_unds_all) - set(n_unds)} self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY") self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY") def set(self, energy): for cb in self.cb_auto_good.values(): cb.put(int(False)) for cb in self.cb_auto_bad.values(): cb.put(int(False)) sleep(0.1) pv_fixed_energy = self.pv_fixed_energy current_state = pv_fixed_energy.get() pv_fixed_energy.put(int(True)) # enforce fixed energy self.pv_energy.put(energy / 1000) # in keV sleep(0.1) for cb in self.cb_auto_good.values(): cb.put(int(True)) sleep(0.1) for cb in self.cb_auto_good.values(): cb.put(int(False)) for cb in self.cb_auto_bad.values(): cb.put(int(False)) sleep(0.1) # pv_fixed_energy.put(current_state) # reset to original state #class Phases: # def __init__(self, n_unds=N_UNDS): # # 22 does not have a chicane # n_unds = n_unds.copy() # if 22 in n_unds: # n_unds.remove(22) # self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY") # self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY") # self.adjs_apply = {i: PVAdjustable(f"SATUN{i:02}-CHIC:APPLY-DOP", internal=True) for i in n_unds} # def set(self, energy): # pv_energy = self.pv_energy # pv_fixed_energy = self.pv_fixed_energy # current_state = pv_fixed_energy.get() # pv_fixed_energy.put(int(True)) # enforce fixed energy # pv_energy.put(energy / 1000) # in keV # sleep(0.1) # self.update() ## sleep(10) # pv_fixed_energy.put(current_state) # reset to original state # def update(self): # wait_for_all([ # adj_apply.set_target_value(1) for adj_apply in self.adjs_apply.values() # ]) def get_machine_n_und_ref(): n_tries = 3 for i in range(n_tries): res = PVEnumAdjustable("SATUN:REF-UND", internal=True).get() if res is not None: break print(f"try {i+1}/{n_tries}: got None instead of reference undulator number") sleep(0.1) if res is None: raise ValueError(f"got None instead of reference undulator number {n_tries} times") if not res.startswith("SATUN"): return None n = len("SATUN") res = res[n:] try: res = int(res) except ValueError: return None return res