Files
furka/undulator.py
2023-07-24 11:55:55 +02:00

234 lines
7.3 KiB
Python

from time import sleep
import numpy as np
from epics import PV
from logzero import logger as log
from slic.core.adjustable import Adjustable
from slic.core.adjustable import PVAdjustable
from slic.core.scanner.scanbackend import wait_for_all #, stop_all
UND_NAME_FMT = "SATUN{:02}-UIND030"
N_UND_CHIC = 14
class Undulators(Adjustable):
def __init__(self, n_unds, n_und_ref, chic_fudge_offset=0, adjust_chic=True, scaled=True, ID="ATHOS_UNDULATORS", name="Athos Undulators", units="eV"):
super().__init__(ID, name=name, units=units)
self.n_unds = n_unds = list(n_unds)
self.n_und_ref = n_und_ref
if n_und_ref not in n_unds:
raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}")
if N_UND_CHIC in n_unds:
log.warning(f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed")
n_unds.remove(N_UND_CHIC)
self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds]
self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref)
self.adjs = {name: Undulator(name) for name in und_names}
self.chic = CHIC(chic_fudge_offset, name, units)
self.adjust_chic = adjust_chic
self.scaled = scaled
self.convert = ConverterEK()
a = self.adjs[und_name_cal]
self.scale = ScalerEK(a)
def set_target_value(self, value, hold=False):
k = self.convert.K(value)
if np.isnan(k):
print("K is nan for", value)
return
print(f"{k} <- {value}")
ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()]
if self.scaled:
header = "scaled: "
ks_target = self.scale.K(value, ks_current)
else:
header = "all equal:"
ks_target = k * np.ones_like(ks_current)
print(header, ks_target)
print()
def change():
#TODO: replace by set_all_target_values_and_wait when print not needed anymore
tasks = []
for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target):
delta = k_old - k_new
print(f"{name}: {k_old}\t->\t{k_new}\t({delta})")
if np.isnan(k_new):
print(f"{name} skipped since target K is nan")
continue
t = a.set_target_value(k_new, hold=False)
tasks.append(t)
wait_for_all(tasks)
if self.adjust_chic:
print("CHIC adjustment follows")
self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed
print("CHIC adjustment done")
return self._as_task(change, hold=hold)
def get_current_value(self):
n = self.und_name_cal
a = self.adjs[n]
k = a.get_current_value()
energy = self.convert.E(k)
# all_ks = [a.get_current_value() for a in self.adjs.values()]
# checks = np.isclose(all_ks, k, rtol=0, atol=0.001)
# if not all(checks):
# print(f"Warning: Ks are not all close to {k}:")
# for name, k, chk in zip(self.adjs.keys(), all_ks, checks):
# if not chk:
# print(name, k)
return energy
def is_moving(self):
return any(a.is_moving() for a in self.adjs)
class Undulator(PVAdjustable):
def __init__(self, name, accuracy=0.5):
pvname_setvalue = name + ":K_SET"
pvname_readback = name + ":K_READ"
super().__init__(pvname_setvalue, pvname_readback=pvname_readback, accuracy=accuracy, active_move=True, name=name, internal=True)
self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True)
@property
def energy(self):
return self.adj_energy.get_current_value() * 1000
class ConverterEK:
h = 4.135667696e-15 # eV * s
c = 299792458 # m / s
lambda_u = 38e-3 # m
const = 2 * h * c / lambda_u # eV
electron_rest_energy = 0.51099895 # MeV
# changed from pvname_electron_energy="SATCL01-MBND100:P-READ" to:
def __init__(self, pvname_electron_energy="SATCB01:ENE-FILT-OP"):
self.pv_electron_energy = PV(pvname_electron_energy)
def K(self, energy):
f = self.get_factor()
v = f / energy - 1
return np.sqrt(2 * v)
def E(self, k_value):
f = self.get_factor()
v = 1 + k_value**2 / 2
return f / v
def get_factor(self):
return self.const * self.get_gamma_squared()
def get_gamma_squared(self):
electron_energy = self.pv_electron_energy.get()
gamma = electron_energy / self.electron_rest_energy
return gamma**2
class ScalerEK:
def __init__(self, und_reference):
self.und = und_reference
def K(self, energy_target, K_current=None):
if K_current is None:
K_current = self.und.get_current_value()
K_current = np.asarray(K_current)
energy_current = self.und.energy
energy_ratio = energy_current / energy_target
K_target_squared = energy_ratio * (K_current**2 + 2) - 2
return np.sqrt(K_target_squared)
class CHIC(PVAdjustable):
def __init__(self, fudge_offset, name, units):
self.fudge_offset = fudge_offset
name += " CHIC Energy"
super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name)
self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE")
self.units = units
def set_target_value(self, value, hold=False):
fudge_offset = self.fudge_offset
print("CHIC fudge offset is", fudge_offset)
value -= fudge_offset
value /= 1000
def change():
sleep(1)
print("CHIC setvalue")
self.pvs.setvalue.put(value, wait=True)
print("CHIC start")
self.pvs.start.put(1, wait=True)
#TODO: test whether an additional sleep is needed
sleep(1)
return self._as_task(change, hold=hold)
def get_current_value(self):
return super().get_current_value() * 1000
class Mono(PVAdjustable):
def __init__(self, pv_mono_name, mono_name):
self.pv_name=pv_mono_name
pvname_setvalue = pv_mono_name + ":SetEnergy"
#pvname_readback = name + ":photonenergy"
pvname_done_moving = pv_mono_name + ":MOVING"
super().__init__(pvname_setvalue, pvname_done_moving=pvname_done_moving, name=mono_name)
class Coupled_MonoUnd(Adjustable):
def __init__(self, n_unds, n_und_ref, chic_fudge_offset=0, adjust_chic=True, scaled=True, ID="ATHOS_Mon_Und", unds_name="Athos Undulators", units="eV", pv_mono_name="", mono_name="", delta=0, name="" ):
super().__init__(ID, name=name, units=units)
self.mono = Mono(pv_mono_name, mono_name)
self.und = Undulators(n_unds, n_und_ref, chic_fudge_offset, name=unds_name)
self.delta = delta
def set_target_value(self, value):
tm = self.mono.set_target_value(value)
tu = self.und.set_target_value(value + self.delta)
tm.wait()
tu.wait()
def get_current_value(self):
return self.mono.get_current_value()
def is_moving(self):
return any([self.mono.is_moving(),self.und.is_moving()])