506 lines
15 KiB
Python
506 lines
15 KiB
Python
from time import sleep
|
|
from datetime import datetime
|
|
import numpy as np
|
|
from epics import PV
|
|
|
|
from logzero import logger as log
|
|
|
|
from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable
|
|
from slic.core.scanner.scanbackend import wait_for_all #, stop_all
|
|
from slic.utils import cprint, tqdm_sleep
|
|
|
|
from .delay_current import DelayCurrentConverter
|
|
|
|
|
|
UND_NAME_FMT = "SATUN{:02}-UIND030"
|
|
N_UND_CHIC = 14
|
|
|
|
N_UNDS = list(range(6, 22+1))
|
|
N_UNDS.remove(N_UND_CHIC)
|
|
|
|
|
|
|
|
class Undulators(Adjustable):
|
|
"""
|
|
for n_und_ref=None (default), the reference undulator currently used by the machine will be used
|
|
"""
|
|
|
|
def __init__(self, n_unds=N_UNDS, n_und_ref=None, chic_fudge_offset=0, adjust_chic=False, scaled=True, ID="ATHOS_UNDULATORS", name="Athos Undulators", units="eV"):
|
|
super().__init__(ID, name=name, units=units)
|
|
|
|
|
|
machine_n_und_ref = get_machine_n_und_ref()
|
|
|
|
if n_und_ref is None:
|
|
if machine_n_und_ref is None:
|
|
raise ValueError(f"could not read reference undulator currently used by the machine, please specify n_und_ref")
|
|
n_und_ref = machine_n_und_ref
|
|
|
|
if n_und_ref != machine_n_und_ref:
|
|
log.warning(f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})", stacklevel=3)
|
|
|
|
|
|
n_unds = list(n_unds)
|
|
|
|
if N_UND_CHIC in n_unds:
|
|
log.warning(f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed", stacklevel=3)
|
|
n_unds.remove(N_UND_CHIC)
|
|
|
|
if n_und_ref not in n_unds:
|
|
raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}")
|
|
|
|
|
|
self.n_unds = n_unds
|
|
self.n_und_ref = n_und_ref
|
|
|
|
self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds]
|
|
self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref)
|
|
|
|
self.adjs = {name: Undulator(name) for name in und_names}
|
|
# self.chic = CHIC(chic_fudge_offset, name, units)
|
|
self.phases = Phases(n_unds)
|
|
|
|
|
|
self.adjust_chic = adjust_chic
|
|
self.scaled = scaled
|
|
|
|
self.convert = ConverterEK()
|
|
|
|
a = self.adjs[und_name_cal]
|
|
self.scale = ScalerEK(a)
|
|
|
|
|
|
|
|
def set_target_value(self, value, hold=False):
|
|
k = self.convert.K(value)
|
|
if np.isnan(k):
|
|
print("K is nan for", value)
|
|
return
|
|
print(f"{k} <- {value}")
|
|
|
|
ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()]
|
|
|
|
if self.scaled:
|
|
header = "scaled: "
|
|
ks_target = self.scale.K(value, ks_current)
|
|
else:
|
|
header = "all equal:"
|
|
ks_target = k * np.ones_like(ks_current)
|
|
|
|
print(header, ks_target)
|
|
print()
|
|
|
|
def change():
|
|
#TODO: replace by set_all_target_values_and_wait when print not needed anymore
|
|
tasks = {}
|
|
for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target):
|
|
delta = k_old - k_new
|
|
print(f"{name}: {k_old}\t->\t{k_new}\t({delta})")
|
|
if np.isnan(k_new):
|
|
print(f"{name} skipped since target K is nan")
|
|
continue
|
|
t = a.set_target_value(k_new, hold=False)
|
|
tasks[name] = t
|
|
# wait_for_all(tasks)
|
|
for n, t in tasks.items():
|
|
print("waiting for ", n)
|
|
t.wait()
|
|
print("target reached ", n)
|
|
|
|
print("starting radial motors")
|
|
|
|
# make sure new K values have been written TODO: needed?
|
|
sleep(2) # check if this can be shortened back to 0.5
|
|
|
|
# switching on radial motors ...
|
|
wait_for_all([
|
|
a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values()
|
|
])
|
|
|
|
# press a few times
|
|
for _ in range(3):
|
|
# ... and pushing go to ensure proper movements
|
|
wait_for_all([
|
|
a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values()
|
|
])
|
|
sleep(0.2)
|
|
|
|
# # make sure the undulators finished moving TODO: needed?
|
|
# sleep(5)
|
|
|
|
if self.adjust_chic:
|
|
print("CHIC adjustment follows")
|
|
# self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed
|
|
self.phases.set(value)
|
|
print("CHIC adjustment done")
|
|
else:
|
|
print("CHIC adjustment skipped")
|
|
|
|
# make sure the undulators and phases finished moving TODO: needed?
|
|
sleep(7)
|
|
|
|
return self._as_task(change, hold=hold)
|
|
|
|
|
|
def get_current_value(self):
|
|
n = self.und_name_cal
|
|
a = self.adjs[n]
|
|
k = a.get_current_value()
|
|
energy = self.convert.E(k)
|
|
|
|
# all_ks = [a.get_current_value() for a in self.adjs.values()]
|
|
# checks = np.isclose(all_ks, k, rtol=0, atol=0.001)
|
|
# if not all(checks):
|
|
# print(f"Warning: Ks are not all close to {k}:")
|
|
# for name, k, chk in zip(self.adjs.keys(), all_ks, checks):
|
|
# if not chk:
|
|
# print(name, k)
|
|
|
|
return energy
|
|
|
|
|
|
def is_moving(self):
|
|
return any(a.is_moving() for a in self.adjs)
|
|
|
|
|
|
def unstuck(self):
|
|
for a in self.adjs.values():
|
|
a.unstuck()
|
|
|
|
|
|
|
|
class Undulator(PVAdjustable):
|
|
|
|
def __init__(self, name, accuracy=0.4):
|
|
pvname_setvalue = name + ":K_SET"
|
|
pvname_readback = name + ":K_READ"
|
|
super().__init__(pvname_setvalue, pvname_readback=pvname_readback, accuracy=accuracy, active_move=True, name=name, internal=True)
|
|
self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True)
|
|
self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON", internal=True)
|
|
self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO", internal=True)
|
|
# self.adj_radial_on_proc = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True)
|
|
# self.adj_radial_go_proc = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True)
|
|
|
|
|
|
def set_target_value(self, value):
|
|
self.wait_for_no_alarm()
|
|
if self.pvs.readback.status != 0:
|
|
self.unstuck()
|
|
return super().set_target_value(value)
|
|
|
|
|
|
def wait_for_no_alarm(self):
|
|
for i in range(10):
|
|
if self.pvs.readback.status == 0:
|
|
break
|
|
if i == 0:
|
|
print(self.name, datetime.now())
|
|
print(self.name, f"seems stuck ({i}) 💩")
|
|
sleep(0.5)
|
|
|
|
|
|
def unstuck(self, delta=0.0001):
|
|
if self.pvs.readback.status == 0:
|
|
print(self.name, "is not stuck")
|
|
return
|
|
|
|
current = self.pvs.setvalue.get()
|
|
temp = current + delta
|
|
self._set_set_value(temp)
|
|
self._set_set_value(current)
|
|
|
|
if self.pvs.readback.status == 0:
|
|
print(self.name, "is not stuck anymore 😍")
|
|
else:
|
|
print(self.name, "is still stuck 😭")
|
|
|
|
|
|
def _set_set_value(self, value):
|
|
self.pvs.setvalue.put(value)
|
|
sleep(0.1)
|
|
self.adj_radial_on.set_target_value(0)
|
|
sleep(0.5)
|
|
self.adj_radial_on.set_target_value(1)
|
|
sleep(0.5)
|
|
self.adj_radial_go.set_target_value(1)
|
|
sleep(3)
|
|
|
|
|
|
@property
|
|
def energy(self):
|
|
return self.adj_energy.get_current_value() * 1000
|
|
|
|
|
|
|
|
class ConverterEK:
|
|
|
|
h = 4.135667696e-15 # eV * s
|
|
c = 299792458 # m / s
|
|
lambda_u = 38e-3 # m
|
|
const = 2 * h * c / lambda_u # eV
|
|
|
|
electron_rest_energy = 0.51099895 # MeV
|
|
|
|
# was: pvname_electron_energy="SATCL01-MBND100:P-READ"
|
|
def __init__(self, pvname_electron_energy="SATCB01:ENE-FILT-OP"):
|
|
self.pv_electron_energy = PV(pvname_electron_energy)
|
|
|
|
def K(self, energy):
|
|
f = self.get_factor()
|
|
v = f / energy - 1
|
|
return np.sqrt(2 * v)
|
|
|
|
def E(self, k_value):
|
|
f = self.get_factor()
|
|
v = 1 + k_value**2 / 2
|
|
return f / v
|
|
|
|
def get_factor(self):
|
|
return self.const * self.get_gamma_squared()
|
|
|
|
def get_gamma_squared(self):
|
|
electron_energy = self.pv_electron_energy.get()
|
|
gamma = electron_energy / self.electron_rest_energy
|
|
return gamma**2
|
|
|
|
|
|
|
|
class ScalerEK:
|
|
|
|
def __init__(self, und_reference):
|
|
self.und = und_reference
|
|
|
|
def K(self, energy_target, K_current=None):
|
|
if K_current is None:
|
|
K_current = self.und.get_current_value()
|
|
K_current = np.asarray(K_current)
|
|
energy_current = self.und.energy
|
|
energy_ratio = energy_current / energy_target
|
|
K_target_squared = energy_ratio * (K_current**2 + 2) - 2
|
|
return np.sqrt(K_target_squared)
|
|
|
|
|
|
|
|
class CHIC(PVAdjustable):
|
|
|
|
def __init__(self, fudge_offset, name, units):
|
|
self.fudge_offset = fudge_offset
|
|
name += " CHIC Energy"
|
|
super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name)
|
|
self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE")
|
|
self.units = units
|
|
|
|
|
|
def set_target_value(self, value, hold=False):
|
|
fudge_offset = self.fudge_offset
|
|
print("CHIC fudge offset is", fudge_offset)
|
|
value -= fudge_offset
|
|
value /= 1000
|
|
|
|
def change():
|
|
sleep(1)
|
|
print("CHIC setvalue")
|
|
self.pvs.setvalue.put(value, wait=True)
|
|
print("CHIC start")
|
|
self.pvs.start.put(1, wait=True)
|
|
#TODO: test whether an additional sleep is needed
|
|
sleep(1)
|
|
|
|
return self._as_task(change, hold=hold)
|
|
|
|
|
|
def get_current_value(self):
|
|
return super().get_current_value() * 1000
|
|
|
|
|
|
|
|
|
|
|
|
class TwoColorChicaneCurrent(PVAdjustable):
|
|
|
|
def __init__(
|
|
self,
|
|
pvname_setvalue="SATUN14-MBND100:I-SET", pvname_readback="SATUN14-MBND100:I-READ", name="Two Color Chicane as current",
|
|
accuracy=0.1, process_time=1,
|
|
hysteresis_protection=True,
|
|
**kwargs
|
|
):
|
|
super().__init__(
|
|
pvname_setvalue, pvname_readback, name=name,
|
|
accuracy=accuracy, process_time=process_time,
|
|
**kwargs
|
|
)
|
|
self.hysteresis_protection = hysteresis_protection
|
|
|
|
self.cycle_time = 250
|
|
self.pv_cycle = PV("SATUN14-MBND100:CYCLE")
|
|
self.previous_target = None
|
|
|
|
|
|
def set_target_value(self, value, **kwargs):
|
|
current = self.get_current_value()
|
|
if value < current:
|
|
cprint(f"Hysteresis Warning: target value {value} is smaller than the current value {current}", color="red")
|
|
if self.hysteresis_protection and value == self.previous_target:
|
|
cprint(f"target value {value} is identical to previous target value, will not try to change", color="cyan")
|
|
return
|
|
self.previous_target = value
|
|
super().set_target_value(value, **kwargs)
|
|
|
|
|
|
def cycle_magnet(self):
|
|
self.pv_cycle.put(1, wait=True)
|
|
tqdm_sleep(self.cycle_time)
|
|
# set the current to 1 and 2 consecutively,
|
|
# since the results after the first step always looks strange
|
|
self.set_target_value(1).wait()
|
|
sleep(1)
|
|
self.set_target_value(2).wait()
|
|
sleep(1)
|
|
|
|
|
|
|
|
class TwoColorChicaneDelay(Adjustable):
|
|
|
|
def __init__(self, ID="TWO-COLOR-CHICANE-DELAY", name="Two Color Chicane as delay", pvname_electron_energy="SATCB01:ENE-FILT-OP", hysteresis_protection=True, units="fs", **kwargs):
|
|
self.adj_current = TwoColorChicaneCurrent(hysteresis_protection=hysteresis_protection, internal=True)
|
|
self.pv_electron_energy = PV(pvname_electron_energy)
|
|
self.converter = DelayCurrentConverter()
|
|
super().__init__(ID, name=name, units=units, **kwargs)
|
|
|
|
def get_current_value(self):
|
|
current = self.adj_current.get_current_value()
|
|
delay = self.calc_delay_fs(current)
|
|
return delay
|
|
|
|
def set_target_value(self, delay):
|
|
current = self.calc_current_A(delay)
|
|
print(f"{current} A <- {delay} fs")
|
|
self.adj_current.set_target_value(current).wait()
|
|
|
|
def is_moving(self):
|
|
return self.adj_current.is_moving()
|
|
|
|
def cycle_magnet(self):
|
|
self.adj_current.cycle_magnet()
|
|
|
|
def calc_current_A(self, delay_fs):
|
|
electron_energy = self.pv_electron_energy.get()
|
|
return self.converter.current_A(delay_fs, electron_energy)
|
|
|
|
def calc_delay_fs(self, current_A):
|
|
electron_energy = self.pv_electron_energy.get()
|
|
return self.converter.delay_fs(current_A, electron_energy)
|
|
|
|
|
|
|
|
|
|
|
|
class Phases:
|
|
|
|
def __init__(self, n_unds=N_UNDS):
|
|
# 22 does not have a chicane
|
|
n_unds = n_unds.copy()
|
|
if 22 in n_unds:
|
|
n_unds.remove(22)
|
|
|
|
# 22 does not have a chicane
|
|
n_unds_all = N_UNDS.copy()
|
|
if 22 in n_unds_all:
|
|
n_unds_all.remove(22)
|
|
|
|
self.cb_auto_good = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in n_unds}
|
|
self.cb_auto_bad = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in set(n_unds_all) - set(n_unds)}
|
|
|
|
self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY")
|
|
self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY")
|
|
|
|
|
|
def set(self, energy):
|
|
for cb in self.cb_auto_good.values(): cb.put(int(False))
|
|
for cb in self.cb_auto_bad.values(): cb.put(int(False))
|
|
sleep(0.1)
|
|
|
|
pv_fixed_energy = self.pv_fixed_energy
|
|
|
|
current_state = pv_fixed_energy.get()
|
|
pv_fixed_energy.put(int(True)) # enforce fixed energy
|
|
|
|
self.pv_energy.put(energy / 1000) # in keV
|
|
sleep(0.1)
|
|
|
|
for cb in self.cb_auto_good.values(): cb.put(int(True))
|
|
sleep(0.1)
|
|
|
|
for cb in self.cb_auto_good.values(): cb.put(int(False))
|
|
for cb in self.cb_auto_bad.values(): cb.put(int(False))
|
|
sleep(0.1)
|
|
|
|
# pv_fixed_energy.put(current_state) # reset to original state
|
|
|
|
|
|
|
|
|
|
|
|
#class Phases:
|
|
|
|
# def __init__(self, n_unds=N_UNDS):
|
|
# # 22 does not have a chicane
|
|
# n_unds = n_unds.copy()
|
|
# if 22 in n_unds:
|
|
# n_unds.remove(22)
|
|
# self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY")
|
|
# self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY")
|
|
# self.adjs_apply = {i: PVAdjustable(f"SATUN{i:02}-CHIC:APPLY-DOP", internal=True) for i in n_unds}
|
|
|
|
|
|
# def set(self, energy):
|
|
# pv_energy = self.pv_energy
|
|
# pv_fixed_energy = self.pv_fixed_energy
|
|
|
|
# current_state = pv_fixed_energy.get()
|
|
# pv_fixed_energy.put(int(True)) # enforce fixed energy
|
|
|
|
# pv_energy.put(energy / 1000) # in keV
|
|
|
|
# sleep(0.1)
|
|
# self.update()
|
|
## sleep(10)
|
|
|
|
# pv_fixed_energy.put(current_state) # reset to original state
|
|
|
|
|
|
# def update(self):
|
|
# wait_for_all([
|
|
# adj_apply.set_target_value(1) for adj_apply in self.adjs_apply.values()
|
|
# ])
|
|
|
|
|
|
|
|
|
|
|
|
def get_machine_n_und_ref():
|
|
n_tries = 3
|
|
for i in range(n_tries):
|
|
res = PVEnumAdjustable("SATUN:REF-UND", internal=True).get()
|
|
if res is not None:
|
|
break
|
|
print(f"try {i+1}/{n_tries}: got None instead of reference undulator number")
|
|
sleep(0.1)
|
|
if res is None:
|
|
raise ValueError(f"got None instead of reference undulator number {n_tries} times")
|
|
if not res.startswith("SATUN"):
|
|
return None
|
|
n = len("SATUN")
|
|
res = res[n:]
|
|
try:
|
|
res = int(res)
|
|
except ValueError:
|
|
return None
|
|
return res
|
|
|
|
|
|
|
|
|
|
|