Files
sfop/phases/full_polarization.py
2022-10-25 20:47:27 +02:00

300 lines
7.9 KiB
Python

from epics import PV
from os.path import dirname
from slic.core.adjustable import Adjustable, PVAdjustable
from slic.core.device import SimpleDevice
from slic.core.task import Task
from slic.utils import json_load
from slic.utils import unpickle
from .models.parallel_model import parallel2gap
#from .models.antiparallel_model import antiparallel2gap
from .models.antiparallel_model import antiparallel_k2rad_fit, antiparallel_k2rad_fit
from time import sleep
import numpy as np
RADIAL_LIMIT = 5.1
UND_PERIOD_PARALLEL = 38.0
UND_PERIOD_ANTIPARALLEL = 2 * UND_PERIOD_PARALLEL
UNDU_FIRST = 10
UNDU_LAST = 22
SETPHASE = -50
def check_phase(phase):
assert -180 <= phase <= 180
def fix_phase(p):
return ((p + 180) % 360) - 180
def convert_phase_to_shift(phase, und_period):
ratio = und_period / 360.0
return phase * ratio / 2
def convert_shift_to_phase(shift, und_period):
ratio = und_period / 360.0
return shift / ratio * 2
class UndPhases(Adjustable):
"""
Set of several UndPhase objects
allows to set the same phase to all undulators
"""
def __init__(self, ID, params, isparallel=True, und_names=None, **kwargs):
super().__init__(ID, **kwargs)
self.params = params
if und_names is None:
und_names = params.keys()
SUFFIX = "-UIND030:"
self.phases = [UndPhase(i + SUFFIX, params[i], isparallel=isparallel) for i in und_names]
def get_current_value(self):
vals = np.array([p.get() for p in self.phases])
vmin = vals.min()
vmax = vals.max()
vmean = vals.mean()
vstd = vals.std()
if vstd > 0.001:
print("there is a large spread in phase values:")
print(f"{vmin} ≤ ({vmean} ± {vstd}) ≤ {vmax}")
return vmean
def set_target_value(self, value):
tasks = [p.set_target_value(value) for p in self.phases]
for t, p in zip(tasks, self.phases):
t.wait()
print("done:", p.name)
def is_moving(self):
return any(p.is_moving() for p in self.phases)
class UndPhase(Adjustable):
"""
Combination of UndShift, UndRadial and UndTotalK
allows to set the phase of one undulator
"""
def __init__(self, ID, params, isparallel, **kwargs):
super().__init__(ID, **kwargs)
self.params = params
self.isparallel = isparallel
self.shift = UndShift(ID) if isparallel else UndShiftQuadrants(ID)
self.radial = UndRadial(ID)
self.totalk = UndTotalK(ID)
def get_current_value(self):
shift = self.shift.get_current_value()
und_period = UND_PERIOD_PARALLEL if self.isparallel else UND_PERIOD_ANTIPARALLEL
return convert_shift_to_phase(shift, und_period)
def set_target_value(self, value):
phase = value
phase = fix_phase(phase)
und_period = UND_PERIOD_PARALLEL if self.isparallel else UND_PERIOD_ANTIPARALLEL
shift = convert_phase_to_shift(phase, und_period)
k = self.totalk.get()
if self.isparallel:
radial = parallel2gap(k, phase, self.params)
else:
#radial = antiparallel2gap(k, phase, self.params)
radial = antiparallel_k2rad_fit(k, phase, self.params)
radial = round(radial, 4) #TODO: why?
# workaround for safety measure
if self.radial.get_current_value() <= RADIAL_LIMIT:
self.radial.set_target_value(RADIAL_LIMIT).wait()
self.shift.set_target_value(shift).wait()
self.radial.set_target_value(radial).wait()
def is_moving(self):
return self.shift.is_moving() or self.radial.is_moving()
class UndShiftRadialBase(PVAdjustable): #TODO: better name?
"""
Base class with functionality shared between UndShift and UndRadial
"""
def __init__(self, ID, accuracy=0.001):
super().__init__(ID + "-SET", ID + "-TL", accuracy=accuracy)
self.pv_on = PV(ID + "-ON")
self.pv_go = PV(ID + "-GO")
def set_target_value(self, value):
f = super().set_target_value
t = Task(
lambda: f(value)
)
t.start()
for i in range(2): #TODO: replace by active wait
sleep(i * 10)
sleep(0.3)
self.pv_on.put(1)
sleep(0.3)
self.pv_go.put(1)
t.wait()
class UndShift(UndShiftRadialBase):
def __init__(self, ID):
super().__init__(ID + "SHIFT")
class UndRadial(UndShiftRadialBase):
def __init__(self, ID):
super().__init__(ID + "RADIAL")
class UndShiftQuadrants(Adjustable):
def __init__(self, ID, accuracy=0.001):
ID += "SHIFT"
super().__init__(ID, units="xxx")
self.opposites = opposites = {
"TL": "BR",
"BL": "TR"
}
inverted_opposites = {v: k for k, v in opposites.items()}
opposites.update(inverted_opposites)
self.all_names = names = opposites.keys()
pvnames = [f"{ID}-{n}" for n in names]
adjs = {n: PVAdjustable(pvn + "-SET", pvn, accuracy=accuracy) for n, pvn in zip(names, pvnames)}
self.adjs = SimpleDevice(ID, **adjs)
self.pv_on = PV(ID + "-ON")
self.pv_go = PV(ID + "-GO")
def set_target_value(self, value):
which = "TL" if value >= 0 else "BL"
opposite = self.opposites[which]
names = [which, opposite]
for n in self.all_names:
if n not in names:
names.append(n)
vals = (value, -value, 0, 0)
tasks = [self.adjs.__dict__[n].set_target_value(v) for n, v in zip(names, vals)]
for i in range(2): #TODO: replace by active wait
sleep(i * 10)
sleep(0.3)
self.pv_on.put(1)
sleep(0.3)
self.pv_go.put(1)
for t in tasks:
t.wait()
def get_current_value(self):
d = self.adjs.__dict__
return d["TL"].get_current_value() - d["TR"].get_current_value()
def is_moving(self):
return any(a.is_moving() for a in self.adjs)
class UndTotalK:
"""
Helper class to get the total K from set value and taper
"""
def __init__(self, ID):
self.pv_kset = PV(ID + "K_SET")
self.pv_ktaperset = PV(ID + "K_TAPER_SET")
def get(self):
k = self.pv_kset.get()
ktaper = self.pv_ktaperset.get()
#TODO why?
k = round(k, 4)
ktaper = round(ktaper, 4)
return k + ktaper
if __name__ == "__main__":
print(__file__)
print(dirname(__file__))
basedir = dirname(__file__)
if basedir:
basedir += '/'
phase = SETPHASE
check_phase(phase)
und_first = UNDU_FIRST
und_last = UNDU_LAST
und_range = list(range(und_first, und_last+1, 1))
if 14 in und_range:
und_range.remove(14)
basename1 = 'SATUN'
undus = [basename1 + str(i).zfill(2) for i in und_range]
print(undus)
# old Excel parameter file with only legacy fixed polarization parameters
params1 = json_load(basedir + "UE38_all_parallel_parameters.json")
print('\n')
print(type(params1))
print(params1.keys())
# Excel parameter file with only fixed polarization parameters
params2 = json_load(basedir + "UE38_all_parameters.json")
print('\n')
print(type(params2))
print(params2.keys())
# Pickle parameter file with fixed and full polarization parameters
alldata = unpickle(basedir + 'UE38_meas_and_fit_data.pickle')
params = alldata['fitdata']
measdata = alldata['measdata']
print('\n')
print(type(params))
print(params.keys())
params = {a:v for a,v in params.items() if a in undus}
#for u in params.keys():
# if u not in undus:
# del(params[u])
ups = UndPhases("SATUN-PHASES", params, isparallel=False)
sleep(0.3)
print(ups.phases[0], ups.phases[0].shift)
# ups.phases[0].set_target_value(phase).wait()
ups.set_target_value(phase).wait()
print(ups.phases[0], ups.phases[0].shift)