From a116243573b6aecc16d920ef460177d0dea96721 Mon Sep 17 00:00:00 2001 From: gac-maloja Date: Tue, 16 Nov 2021 15:32:57 +0100 Subject: [PATCH] first try --- .gitignore | 139 +++++++++++++++++++++++ SetAthosUndEnergy.py | 24 ++++ SetAthosUndEnergy.sh | 4 + devices/undulator.py | 260 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 427 insertions(+) create mode 100644 .gitignore create mode 100755 SetAthosUndEnergy.py create mode 100755 SetAthosUndEnergy.sh create mode 100644 devices/undulator.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..73358ad --- /dev/null +++ b/.gitignore @@ -0,0 +1,139 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + diff --git a/SetAthosUndEnergy.py b/SetAthosUndEnergy.py new file mode 100755 index 0000000..221ef6b --- /dev/null +++ b/SetAthosUndEnergy.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +import argparse + +parser = argparse.ArgumentParser(description="Set Athos energy via undulators") +parser.add_argument("energy", type=float, help="Target energy in eV") +clargs = parser.parse_args() + + +from time import sleep +from devices.undulator import Undulators + +und = Undulators(adjust_chic=False) +sleep(1) # give PVs some time to connect + + +print("before:", und) +print("target:", clargs.energy, "eV") +print("actual change is commented out!") +#und.set(clargs.energy) +print("after: ", und) + + + diff --git a/SetAthosUndEnergy.sh b/SetAthosUndEnergy.sh new file mode 100755 index 0000000..1e89d88 --- /dev/null +++ b/SetAthosUndEnergy.sh @@ -0,0 +1,4 @@ +#!/bin/bash +cd $(dirname $0) +PYTHONPATH=../slic ./SetAthosUndEnergy.py $@ + diff --git a/devices/undulator.py b/devices/undulator.py new file mode 100644 index 0000000..58fb695 --- /dev/null +++ b/devices/undulator.py @@ -0,0 +1,260 @@ +from time import sleep +import numpy as np +from epics import PV + +from logzero import logger as log + +from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable +from slic.core.scanner.scanbackend import wait_for_all #, stop_all + + +UND_NAME_FMT = "SATUN{:02}-UIND030" +N_UND_CHIC = 14 + +N_UNDS = list(range(6, 22+1)) +N_UNDS.remove(N_UND_CHIC) + + + +class Undulators(Adjustable): + """ + for n_und_ref=None (default), the reference undulator currently used by the machine will be used + """ + + def __init__(self, n_unds=N_UNDS, n_und_ref=None, chic_fudge_offset=0, adjust_chic=True, scaled=True, ID="ATHOS_UNDULATORS", name="Athos Undulators", units="eV"): + super().__init__(ID, name=name, units=units) + + + machine_n_und_ref = get_machine_n_und_ref() + + if n_und_ref is None: + if machine_n_und_ref is None: + raise ValueError(f"could not read reference undulator currently used by the machine, please specify n_und_ref") + n_und_ref = machine_n_und_ref + + if n_und_ref != machine_n_und_ref: + log.warning(f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})") + + + n_unds = list(n_unds) + + if N_UND_CHIC in n_unds: + log.warning(f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed") + n_unds.remove(N_UND_CHIC) + + if n_und_ref not in n_unds: + raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}") + + + self.n_unds = n_unds + self.n_und_ref = n_und_ref + + self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds] + self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref) + + self.adjs = {name: Undulator(name) for name in und_names} + self.chic = CHIC(chic_fudge_offset, name, units) + + + self.adjust_chic = adjust_chic + self.scaled = scaled + + self.convert = ConverterEK() + + a = self.adjs[und_name_cal] + self.scale = ScalerEK(a) + + + + def set_target_value(self, value, hold=False): + k = self.convert.K(value) + if np.isnan(k): + print("K is nan for", value) + return + print(f"{k} <- {value}") + + ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()] + + if self.scaled: + header = "scaled: " + ks_target = self.scale.K(value, ks_current) + else: + header = "all equal:" + ks_target = k * np.ones_like(ks_current) + + print(header, ks_target) + print() + + def change(): + #TODO: replace by set_all_target_values_and_wait when print not needed anymore + tasks = [] + for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target): + delta = k_old - k_new + print(f"{name}: {k_old}\t->\t{k_new}\t({delta})") + if np.isnan(k_new): + print(f"{name} skipped since target K is nan") + continue + t = a.set_target_value(k_new, hold=False) + tasks.append(t) + wait_for_all(tasks) + + # make sure new K values have been written TODO: needed? + sleep(0.5) + + # switching on radial motors ... + wait_for_all([ + a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values() + ]) + + # ... and pushing go to ensure proper movements + wait_for_all([ + a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values() + ]) + + # make sure the undulators finished moving TODO: needed? + sleep(5) + + if self.adjust_chic: + print("CHIC adjustment follows") + self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed + print("CHIC adjustment done") + else: + print("CHIC adjustment skipped") + + return self._as_task(change, hold=hold) + + + def get_current_value(self): + n = self.und_name_cal + a = self.adjs[n] + k = a.get_current_value() + energy = self.convert.E(k) + +# all_ks = [a.get_current_value() for a in self.adjs.values()] +# checks = np.isclose(all_ks, k, rtol=0, atol=0.001) +# if not all(checks): +# print(f"Warning: Ks are not all close to {k}:") +# for name, k, chk in zip(self.adjs.keys(), all_ks, checks): +# if not chk: +# print(name, k) + + return energy + + + def is_moving(self): + return any(a.is_moving() for a in self.adjs) + + + +class Undulator(PVAdjustable): + + def __init__(self, name, accuracy=0.0005): + pvname_setvalue = name + ":K_SET" + pvname_readback = name + ":K_READ" + super().__init__(pvname_setvalue, pvname_readback=pvname_readback, accuracy=accuracy, active_move=True, name=name, internal=True) + self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True) + self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True) + self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True) + + @property + def energy(self): + return self.adj_energy.get_current_value() * 1000 + + + +class ConverterEK: + + h = 4.135667696e-15 # eV * s + c = 299792458 # m / s + lambda_u = 38e-3 # m + const = 2 * h * c / lambda_u # eV + + electron_rest_energy = 0.51099895 # MeV + + # was: pvname_electron_energy="SATCL01-MBND100:P-READ" + def __init__(self, pvname_electron_energy="SATCB01:ENE-FILT-OP"): + self.pv_electron_energy = PV(pvname_electron_energy) + + def K(self, energy): + f = self.get_factor() + v = f / energy - 1 + return np.sqrt(2 * v) + + def E(self, k_value): + f = self.get_factor() + v = 1 + k_value**2 / 2 + return f / v + + def get_factor(self): + return self.const * self.get_gamma_squared() + + def get_gamma_squared(self): + electron_energy = self.pv_electron_energy.get() + gamma = electron_energy / self.electron_rest_energy + return gamma**2 + + + +class ScalerEK: + + def __init__(self, und_reference): + self.und = und_reference + + def K(self, energy_target, K_current=None): + if K_current is None: + K_current = self.und.get_current_value() + K_current = np.asarray(K_current) + energy_current = self.und.energy + energy_ratio = energy_current / energy_target + K_target_squared = energy_ratio * (K_current**2 + 2) - 2 + return np.sqrt(K_target_squared) + + + +class CHIC(PVAdjustable): + + def __init__(self, fudge_offset, name, units): + self.fudge_offset = fudge_offset + name += " CHIC Energy" + super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name) + self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE") + self.units = units + + + def set_target_value(self, value, hold=False): + fudge_offset = self.fudge_offset + print("CHIC fudge offset is", fudge_offset) + value -= fudge_offset + value /= 1000 + + def change(): + sleep(1) + print("CHIC setvalue") + self.pvs.setvalue.put(value, wait=True) + print("CHIC start") + self.pvs.start.put(1, wait=True) + #TODO: test whether an additional sleep is needed + sleep(1) + + return self._as_task(change, hold=hold) + + + def get_current_value(self): + return super().get_current_value() * 1000 + + + +def get_machine_n_und_ref(): + res = PVEnumAdjustable("SATUN:REF-UND").get() + if not res.startswith("SATUN"): + return None + n = len("SATUN") + res = res[n:] + try: + res = int(res) + except ValueError: + return None + return res + + +