Files
cristallina/beamline/undulator.py

415 lines
13 KiB
Python
Executable File

import time
from time import sleep
import numpy as np
from epics import PV
from logzero import logger as log
from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable
from slic.core.scanner.scanbackend import wait_for_all # , stop_all
import subprocess
UND_NAME_FMT = "SARUN{:02}-UIND030"
N_UND_CHIC = None
N_UNDS = list(range(3, 15 + 1))
# N_UNDS.remove(N_UND_CHIC) # does not exist
# from Alvra mono calibration
# energy_offset = 20.37839
# Cristallina without calibration
# offset is the difference between PSSS and undulator setpoint
# sign convention: Undulator - PSSS
energy_offset = 0 # eV
# PSSS = 8355 eV, machine = 8253 eV
# move the PSSS motor according to the energy
# TODO: improve this hack
PSSS_MOVE = True
def set_PSSS_energy(energy: float):
"""When scanning the energy with the undulator we
adjust the spectrometer to follow.
"""
energy = energy - energy_offset
print(f"Adjusting PSSS to {energy}")
PSSS_energy_PV_name = "SARFE10-PSSS059:ENERGY"
PSSS_energy_PV = PV(PSSS_energy_PV_name)
PSSS_energy_PV.put(energy, wait=True)
ret = subprocess.run(["python", "/sf/photo/src/PSSS_motor/qt/PSSS_motion.py", "-s", "-m5", "SARFE10-PSSS059"])
sleep(2)
if ret.returncode != 0:
log.warning("WARNING: PSSS adjustment failed.")
else:
print("Finished adjusting PSSS.")
class Undulators(Adjustable):
"""
for n_und_ref=None (default), the reference undulator currently used by the machine will be used
"""
def __init__(
self, n_unds=N_UNDS, n_und_ref=None, scaled=True, ID="ARAMIS_UNDULATORS", name="Aramis Undulators", units="eV"
):
# # don't allow setting these since there's no chic :)
# chic_fudge_offset = 0
# adjust_chic = False
super().__init__(ID, name=name, units=units)
machine_n_und_ref = get_machine_n_und_ref()
if n_und_ref is None:
if machine_n_und_ref is None:
raise ValueError(
f"could not read reference undulator currently used by the machine, please specify n_und_ref"
)
n_und_ref = machine_n_und_ref
if n_und_ref != machine_n_und_ref:
log.warning(
f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})"
)
n_unds = list(n_unds)
if N_UND_CHIC in n_unds:
log.warning(
f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed"
)
n_unds.remove(N_UND_CHIC)
if n_und_ref not in n_unds:
raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}")
self.n_unds = n_unds
self.n_und_ref = n_und_ref
self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds]
self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref)
self.adjs = {name: Undulator(name) for name in und_names}
# self.chic = CHIC(chic_fudge_offset, name, units)
# self.phases = Phases(n_unds)
# self.adjust_chic = adjust_chic
self.scaled = scaled
self.convert = ConverterEK()
a = self.adjs[und_name_cal]
self.scale = ScalerEK(a)
def set_target_value(self, value, hold=False):
value = value + energy_offset
k = self.convert.K(value)
if np.isnan(k):
print("K is nan for", value)
return
print(f"{k} <- {value}")
ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()]
if self.scaled:
header = "scaled: "
ks_target = self.scale.K(value, ks_current)
else:
header = "all equal:"
ks_target = k * np.ones_like(ks_current)
print(header, ks_target)
print()
def change():
# TODO: replace by set_all_target_values_and_wait when print not needed anymore
tasks = []
for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target):
delta = k_old - k_new
print(f"{name}: {k_old}\t->\t{k_new}\t({delta})")
if np.isnan(k_new):
print(f"{name} skipped since target K is nan")
continue
t = a.set_target_value(k_new, hold=False)
tasks.append(t)
wait_for_all(tasks)
# # make sure new K values have been written TODO: needed?
# sleep(2) # check if this can be shortened back to 0.5
# # switching on radial motors ...
# wait_for_all([
# a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values()
# ])
# # press a few times
# for _ in range(3):
# # ... and pushing go to ensure proper movements
# wait_for_all([
# a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values()
# ])
# sleep(0.2)
# # make sure the undulators finished moving TODO: needed?
# sleep(5)
# if self.adjust_chic:
# print("CHIC adjustment follows")
## self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed
# self.phases.set(value)
# print("CHIC adjustment done")
# else:
# print("CHIC adjustment skipped")
# make sure the undulators and phases finished moving TODO: needed?
sleep(5)
if not PSSS_MOVE:
print("no PSSS movement enabled")
else:
set_PSSS_energy(value)
return self._as_task(change, hold=hold)
def get_current_value(self):
n = self.und_name_cal
a = self.adjs[n]
k = a.get_current_value()
energy = self.convert.E(k) - energy_offset
# all_ks = [a.get_current_value() for a in self.adjs.values()]
# checks = np.isclose(all_ks, k, rtol=0, atol=0.001)
# if not all(checks):
# print(f"Warning: Ks are not all close to {k}:")
# for name, k, chk in zip(self.adjs.keys(), all_ks, checks):
# if not chk:
# print(name, k)
return energy # if we need to fudge the number to match the mono, do it here!
def is_moving(self):
return any(a.is_moving() for a in self.adjs)
class Undulator(PVAdjustable):
def __init__(self, name, accuracy=0.0005):
pvname_setvalue = name + ":K_SET"
pvname_readback = pvname_setvalue # name + ":K_READ" #TODO: there are no readback values?
super().__init__(
pvname_setvalue,
pvname_readback=pvname_readback,
accuracy=accuracy,
active_move=True,
name=name,
internal=True,
)
self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True)
# self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON", internal=True) #TODO: do not exist
# self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO", internal=True) #TODO: do not exist
# self.adj_radial_on_proc = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True)
# self.adj_radial_go_proc = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True)
@property
def energy(self):
return self.adj_energy.get_current_value() * 1000
class ConverterEK:
h = 4.135667696e-15 # eV * s
c = 299792458 # m / s
lambda_u = 15e-3 # m
const = 2 * h * c / lambda_u # eV
electron_rest_energy = 0.51099895 # MeV
# was: pvname_electron_energy="SATCL01-MBND100:P-READ"
# was: pvname_electron_energy="SATCB01:ENE-FILT-OP"
def __init__(self, pvname_electron_energy="SARCL02-MBND100:P-READ"): # S30CB13:ENE-FILT-OP
self.pv_electron_energy = PV(pvname_electron_energy)
def K(self, energy):
f = self.get_factor()
v = f / energy - 1
return np.sqrt(2 * v)
def E(self, k_value):
f = self.get_factor()
v = 1 + k_value**2 / 2
return f / v
def get_factor(self):
return self.const * self.get_gamma_squared()
def get_gamma_squared(self):
electron_energy = self.pv_electron_energy.get()
gamma = electron_energy / self.electron_rest_energy
return gamma**2
class ScalerEK:
def __init__(self, und_reference):
self.und = und_reference
def K(self, energy_target, K_current=None):
if K_current is None:
K_current = self.und.get_current_value()
K_current = np.asarray(K_current)
energy_current = self.und.energy
energy_ratio = energy_current / energy_target
K_target_squared = energy_ratio * (K_current**2 + 2) - 2
return np.sqrt(K_target_squared)
# class CHIC(PVAdjustable):
#
# def __init__(self, fudge_offset, name, units):
# self.fudge_offset = fudge_offset
# name += " CHIC Energy"
# super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name)
# self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE")
# self.units = units
#
#
# def set_target_value(self, value, hold=False):
# fudge_offset = self.fudge_offset
# print("CHIC fudge offset is", fudge_offset)
# value -= fudge_offset
# value /= 1000
#
# def change():
# sleep(1)
# print("CHIC setvalue")
# self.pvs.setvalue.put(value, wait=True)
# print("CHIC start")
# self.pvs.start.put(1, wait=True)
# #TODO: test whether an additional sleep is needed
# sleep(1)
#
# return self._as_task(change, hold=hold)
#
#
# def get_current_value(self):
# return super().get_current_value() * 1000
# class TwoColorChicane(PVAdjustable):
#
# def __init__(self, name):#, t0=0):
## self.t0 = t0
## name += " Two Color Chicane"
# super().__init__("SATUN14-MBND100:I-SET", "SATUN14-MBND100:I-READ", process_time=1, name=name)
#
## def set_target_value(self, value, hold=False):
## super().set_target_value(value)
#
## def get_current_value(self):
## return super().get_current_value()
# class Phases:
#
# def __init__(self, n_unds=N_UNDS):
# # 22 does not have a chicane
# n_unds = n_unds.copy()
# if 22 in n_unds:
# n_unds.remove(22)
#
# # 22 does not have a chicane
# n_unds_all = N_UNDS.copy()
# if 22 in n_unds_all:
# n_unds_all.remove(22)
#
# self.cb_auto_good = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in n_unds}
# self.cb_auto_bad = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in set(n_unds_all) - set(n_unds)}
#
# self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY")
# self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY")
#
#
# def set(self, energy):
# for cb in self.cb_auto_good.values(): cb.put(int(False))
# for cb in self.cb_auto_bad.values(): cb.put(int(False))
# sleep(0.1)
#
# pv_fixed_energy = self.pv_fixed_energy
#
# current_state = pv_fixed_energy.get()
# pv_fixed_energy.put(int(True)) # enforce fixed energy
#
# self.pv_energy.put(energy / 1000) # in keV
# sleep(0.1)
#
# for cb in self.cb_auto_good.values(): cb.put(int(True))
# sleep(0.1)
#
# for cb in self.cb_auto_good.values(): cb.put(int(False))
# for cb in self.cb_auto_bad.values(): cb.put(int(False))
# sleep(0.1)
#
## pv_fixed_energy.put(current_state) # reset to original state
# class Phases:
# def __init__(self, n_unds=N_UNDS):
# # 22 does not have a chicane
# n_unds = n_unds.copy()
# if 22 in n_unds:
# n_unds.remove(22)
# self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY")
# self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY")
# self.adjs_apply = {i: PVAdjustable(f"SATUN{i:02}-CHIC:APPLY-DOP", internal=True) for i in n_unds}
# def set(self, energy):
# pv_energy = self.pv_energy
# pv_fixed_energy = self.pv_fixed_energy
# current_state = pv_fixed_energy.get()
# pv_fixed_energy.put(int(True)) # enforce fixed energy
# pv_energy.put(energy / 1000) # in keV
# sleep(0.1)
# self.update()
## sleep(10)
# pv_fixed_energy.put(current_state) # reset to original state
# def update(self):
# wait_for_all([
# adj_apply.set_target_value(1) for adj_apply in self.adjs_apply.values()
# ])
def get_machine_n_und_ref():
for attempt in range(3):
try:
res = PVEnumAdjustable("SARUN:REF-UND").get()
if not res.startswith("SARUN"):
return None
except AttributeError as e:
print("Error getting undulator, retrying")
print(e)
time.sleep(1)
else:
break
n = len("SARUN")
res = res[n:]
try:
res = int(res)
except ValueError:
return None
return res