first try

This commit is contained in:
gac-maloja
2021-11-16 15:32:57 +01:00
parent 82e8da8e80
commit a116243573
4 changed files with 427 additions and 0 deletions

139
.gitignore vendored Normal file
View File

@ -0,0 +1,139 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/

24
SetAthosUndEnergy.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser(description="Set Athos energy via undulators")
parser.add_argument("energy", type=float, help="Target energy in eV")
clargs = parser.parse_args()
from time import sleep
from devices.undulator import Undulators
und = Undulators(adjust_chic=False)
sleep(1) # give PVs some time to connect
print("before:", und)
print("target:", clargs.energy, "eV")
print("actual change is commented out!")
#und.set(clargs.energy)
print("after: ", und)

4
SetAthosUndEnergy.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
cd $(dirname $0)
PYTHONPATH=../slic ./SetAthosUndEnergy.py $@

260
devices/undulator.py Normal file
View File

@ -0,0 +1,260 @@
from time import sleep
import numpy as np
from epics import PV
from logzero import logger as log
from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable
from slic.core.scanner.scanbackend import wait_for_all #, stop_all
UND_NAME_FMT = "SATUN{:02}-UIND030"
N_UND_CHIC = 14
N_UNDS = list(range(6, 22+1))
N_UNDS.remove(N_UND_CHIC)
class Undulators(Adjustable):
"""
for n_und_ref=None (default), the reference undulator currently used by the machine will be used
"""
def __init__(self, n_unds=N_UNDS, n_und_ref=None, chic_fudge_offset=0, adjust_chic=True, scaled=True, ID="ATHOS_UNDULATORS", name="Athos Undulators", units="eV"):
super().__init__(ID, name=name, units=units)
machine_n_und_ref = get_machine_n_und_ref()
if n_und_ref is None:
if machine_n_und_ref is None:
raise ValueError(f"could not read reference undulator currently used by the machine, please specify n_und_ref")
n_und_ref = machine_n_und_ref
if n_und_ref != machine_n_und_ref:
log.warning(f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})")
n_unds = list(n_unds)
if N_UND_CHIC in n_unds:
log.warning(f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed")
n_unds.remove(N_UND_CHIC)
if n_und_ref not in n_unds:
raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}")
self.n_unds = n_unds
self.n_und_ref = n_und_ref
self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds]
self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref)
self.adjs = {name: Undulator(name) for name in und_names}
self.chic = CHIC(chic_fudge_offset, name, units)
self.adjust_chic = adjust_chic
self.scaled = scaled
self.convert = ConverterEK()
a = self.adjs[und_name_cal]
self.scale = ScalerEK(a)
def set_target_value(self, value, hold=False):
k = self.convert.K(value)
if np.isnan(k):
print("K is nan for", value)
return
print(f"{k} <- {value}")
ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()]
if self.scaled:
header = "scaled: "
ks_target = self.scale.K(value, ks_current)
else:
header = "all equal:"
ks_target = k * np.ones_like(ks_current)
print(header, ks_target)
print()
def change():
#TODO: replace by set_all_target_values_and_wait when print not needed anymore
tasks = []
for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target):
delta = k_old - k_new
print(f"{name}: {k_old}\t->\t{k_new}\t({delta})")
if np.isnan(k_new):
print(f"{name} skipped since target K is nan")
continue
t = a.set_target_value(k_new, hold=False)
tasks.append(t)
wait_for_all(tasks)
# make sure new K values have been written TODO: needed?
sleep(0.5)
# switching on radial motors ...
wait_for_all([
a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values()
])
# ... and pushing go to ensure proper movements
wait_for_all([
a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values()
])
# make sure the undulators finished moving TODO: needed?
sleep(5)
if self.adjust_chic:
print("CHIC adjustment follows")
self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed
print("CHIC adjustment done")
else:
print("CHIC adjustment skipped")
return self._as_task(change, hold=hold)
def get_current_value(self):
n = self.und_name_cal
a = self.adjs[n]
k = a.get_current_value()
energy = self.convert.E(k)
# all_ks = [a.get_current_value() for a in self.adjs.values()]
# checks = np.isclose(all_ks, k, rtol=0, atol=0.001)
# if not all(checks):
# print(f"Warning: Ks are not all close to {k}:")
# for name, k, chk in zip(self.adjs.keys(), all_ks, checks):
# if not chk:
# print(name, k)
return energy
def is_moving(self):
return any(a.is_moving() for a in self.adjs)
class Undulator(PVAdjustable):
def __init__(self, name, accuracy=0.0005):
pvname_setvalue = name + ":K_SET"
pvname_readback = name + ":K_READ"
super().__init__(pvname_setvalue, pvname_readback=pvname_readback, accuracy=accuracy, active_move=True, name=name, internal=True)
self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True)
self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True)
self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True)
@property
def energy(self):
return self.adj_energy.get_current_value() * 1000
class ConverterEK:
h = 4.135667696e-15 # eV * s
c = 299792458 # m / s
lambda_u = 38e-3 # m
const = 2 * h * c / lambda_u # eV
electron_rest_energy = 0.51099895 # MeV
# was: pvname_electron_energy="SATCL01-MBND100:P-READ"
def __init__(self, pvname_electron_energy="SATCB01:ENE-FILT-OP"):
self.pv_electron_energy = PV(pvname_electron_energy)
def K(self, energy):
f = self.get_factor()
v = f / energy - 1
return np.sqrt(2 * v)
def E(self, k_value):
f = self.get_factor()
v = 1 + k_value**2 / 2
return f / v
def get_factor(self):
return self.const * self.get_gamma_squared()
def get_gamma_squared(self):
electron_energy = self.pv_electron_energy.get()
gamma = electron_energy / self.electron_rest_energy
return gamma**2
class ScalerEK:
def __init__(self, und_reference):
self.und = und_reference
def K(self, energy_target, K_current=None):
if K_current is None:
K_current = self.und.get_current_value()
K_current = np.asarray(K_current)
energy_current = self.und.energy
energy_ratio = energy_current / energy_target
K_target_squared = energy_ratio * (K_current**2 + 2) - 2
return np.sqrt(K_target_squared)
class CHIC(PVAdjustable):
def __init__(self, fudge_offset, name, units):
self.fudge_offset = fudge_offset
name += " CHIC Energy"
super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name)
self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE")
self.units = units
def set_target_value(self, value, hold=False):
fudge_offset = self.fudge_offset
print("CHIC fudge offset is", fudge_offset)
value -= fudge_offset
value /= 1000
def change():
sleep(1)
print("CHIC setvalue")
self.pvs.setvalue.put(value, wait=True)
print("CHIC start")
self.pvs.start.put(1, wait=True)
#TODO: test whether an additional sleep is needed
sleep(1)
return self._as_task(change, hold=hold)
def get_current_value(self):
return super().get_current_value() * 1000
def get_machine_n_und_ref():
res = PVEnumAdjustable("SATUN:REF-UND").get()
if not res.startswith("SATUN"):
return None
n = len("SATUN")
res = res[n:]
try:
res = int(res)
except ValueError:
return None
return res