large cleanup, part 1

This commit is contained in:
2023-09-13 17:29:36 +02:00
parent 89a76ee62a
commit ef7007582f
47 changed files with 5106 additions and 1323 deletions

313
beamline/PSSS_motion.py Normal file
View File

@ -0,0 +1,313 @@
#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2014 by Paul Scherrer Institute (http://www.psi.ch) |
# | |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
"""
motion procedures for PSSS:
mode:
0: homing
1: calc energy and SPECTRUM_X out of camera arm angle
move CamPosX out of CristBendRot
"""
from __future__ import print_function
from errno import ENETRESET
import logging, sys, os, json
import CaChannel, time
import numpy as np
class PSSS:
def __init__(self, args):
# import pdb; pdb.set_trace()
# sys.exit()
# {'mode': 0, 'stdout': True, 'var': []}
# {'mode': 5, 'stdout': True, 'var': ['SARFE10-PSSS059']}
self.args = args
prefix = self.args.var[0]
prefix = prefix[0 : prefix.find("-") + 1]
self.prefix = prefix
# print('Prefix='+prefix)
self.pv = dict()
def getPv(self, name):
try:
pv = self.pv[name]
except KeyError:
prefix = self.prefix
pv = CaChannel.CaChannel(prefix + name)
pv.searchw()
self.pv[name] = pv
return pv
def moveLimit(self, m, val):
self.moveAbs(m, val)
def waitMotionDone(self, m):
s = m + ".DMOV"
pv = self.getPv(s)
sys.stdout.write("wait motion " + s + " ")
sys.stdout.flush()
while pv.getw() == 0:
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(0.1)
print("done")
def setPos(self, m, ofs):
pvSET = self.getPv(m + ".SET")
pvVAL = self.getPv(m + ".VAL")
pvSET.putw(1) # Set
print("set ofset to %f" % ofs)
pvVAL.putw(ofs) # set value (without move)
time.sleep(0.1) # wait for safety
pvSET.putw(0) # Use
time.sleep(0.1) # wait for safety
def moveAbs(self, m, val):
pvVAL = self.getPv(m + ".VAL")
pvVAL.putw(val) # set position and move
def homing(self):
pvNameLst = (
"PSSS055:MOTOR_ROT_X1",
"PSSS055:MOTOR_X1",
"PSSS055:MOTOR_Y1",
"PSSS055:MOTOR_PROBE",
"PSSS059:MOTOR_X5",
"PSSS059:MOTOR_Y5",
"PSSS059:MOTOR_Z5",
"PSSS059:MOTOR_X3",
"PSSS059:MOTOR_Y3",
)
pvHOME = self.getPv("PSSS059:HOMING.VAL")
if pvHOME.getw(1) == 1:
print("homing still in progress. abort new procedure")
# return
pvHOME.putw(1) # homing in progress
try:
pvHomr = self.getPv(pvNameLst[0] + ".HOMR")
pvHomr.putw(1) # homing MOTOR_ROT_X1
self.waitMotionDone(pvNameLst[0])
homing = (
(1, 200, 10, 0), # PSSS055:MOTOR_X1 Home on +limit switch to +10mm
(2, 10, 4.475, 2.22), # PSSS055:MOTOR_Y1 Home on +limit switch to +4.475mm
(3, 50, 0, -9), # PSSS055:MOTOR_PROBE Home on +limit switch to +0mm
(4, 80, 35, 0), # PSSS059:MOTOR_X5 Home on +limit switch to +35mm
(5, 30, 10, 0), # PSSS059:MOTOR_Y5 Home on +limit switch to +10mm
# (6,20,8.9,0), # PSSS059:MOTOR_Z5 Home on +limit switch to +8.9mm Set HLM to 10, LLM to -2mm
(7, 30, 1, 0), # PSSS059:MOTOR_X3 Home on +limit switch to +10mm
(8, 30, 1, -1.4), # PSSS059:MOTOR_Y3 Home on +limit switch to +10mm
)
for idx, mvLim, setPos, mvPos in homing:
pvName = pvNameLst[idx]
print("homing %s %f %f %f" % (pvName, mvLim, setPos, mvPos))
self.moveLimit(pvName, mvLim)
for idx, mvLim, setPos, mvPos in homing:
pvName = pvNameLst[idx]
self.waitMotionDone(pvName)
self.setPos(pvName, setPos)
time.sleep(2)
print("sleep 2 sec.")
for idx, mvLim, setPos, mvPos in homing:
pvName = pvNameLst[idx]
self.moveAbs(pvName, mvPos)
for idx, mvLim, setPos, mvPos in homing:
pvName = pvNameLst[idx]
self.waitMotionDone(pvName)
except AssertionError as e: # BaseException as e:
print(e)
pvHOME.putw(3) # homing failed
else:
pvHOME.putw(2) # homing done
def set_energy_motor(self, energy2motor, scan=False, rotWait=False):
crystalType = self.getPv("PSSS059:CRYSTAL_SP").getw()
# crystalType=2
# print(crystalType)
# print(type(crystalType))
if crystalType == 0:
return
# 0 None
# 1 Si(111)R155
# 2 Si(220)R75
# 3 Si(220)R145
# 4 Si(220)R200
# 5 Si(333)R155
# load lut
fnXtlLst = (None, "Si111R155", "Si220R75", "Si220R145", "Si220R200", "Si333R155")
# print(__file__)
base = os.path.dirname(__file__)
fn = os.path.join(base, "lut" + fnXtlLst[crystalType] + ".txt")
lut = np.genfromtxt(fn, delimiter="\t", names=True)
# lut
# lut.dtype
# lut[1]['Energy']
# lut['Energy']
# lut=np.genfromtxt(fn, delimiter='\t',skip_header=1)
if energy2motor:
energy = self.getPv("PSSS059:ENERGY").getw()
# energy =6000
gratingType = self.getPv("PSSS055:GRATING_SP").getw()
# gratingType=3
camPosX = "CamPosX"
if gratingType in (1, 2):
camPosX += "_100nm"
elif gratingType == 3:
camPosX += "_150nm"
elif gratingType == 4:
camPosX += "_200nm"
camArmRot = np.interp(energy, lut["Energy"], lut["CamArmRot"])
cristBendRot = np.interp(energy, lut["Energy"], lut["CristBendRot"])
camPosX = np.interp(energy, lut["Energy"], lut[camPosX])
evPerPix = np.interp(energy, lut["Energy"], lut["EvPerPix"])
else:
camArmRot = self.getPv("PSSS059:MOTOR_ROT_X4").getw()
idx = ~np.isnan(lut["CamArmRot"])
lutCamArmRot = lut["CamArmRot"][idx]
energy = np.interp(camArmRot, lutCamArmRot[::-1], lut["Energy"][idx][::-1])
evPerPix = np.interp(camArmRot, lutCamArmRot[::-1], lut["EvPerPix"][idx][::-1])
# camera: 2560 x 2160
n = 2560
i = np.arange(n) - n / 2
spctr_x = energy + i * evPerPix
pv = self.getPv("PSSS059:SPECTRUM_X")
pv.putw(spctr_x)
pv = self.getPv("PSSS059:SPECTRUM_Y")
mu = 2560.0 / 2
sigma = 100.0
x = np.arange(2560.0)
spctr_y = 1000.0 * np.exp(-((x - mu) ** 2 / (2.0 * sigma**2)))
pv.putw(spctr_y)
if energy2motor:
print(
"energy2motor: camArmRot: %g cristBendRot: %g camPosX:%g evPerPix:%g"
% (camArmRot, cristBendRot, camPosX, evPerPix)
)
pv = self.getPv("PSSS059:MOTOR_ROT_X4")
pv.putw(camArmRot)
pv = self.getPv("PSSS059:MOTOR_ROT_X3")
pv.putw(cristBendRot)
if rotWait == True:
self.waitMotionDone("PSSS059:MOTOR_ROT_X3")
if scan == False: # if True the camera X position will not be changed
pv = self.getPv("PSSS059:MOTOR_X5")
pv.putw(camPosX)
else:
print("motor2energy: energy: %g evPerPix:%g" % (energy, evPerPix))
pv = self.getPv("PSSS059:ENERGY")
pv.putw(energy)
def grating2motor(self):
energy = self.getPv("PSSS059:ENERGY").getw()
gratingType = self.getPv("PSSS055:GRATING_SP").getw()
if gratingType == 0:
print("no grating")
girderX = 0.0
else:
base = os.path.dirname(__file__)
fn = fn = os.path.join(base, "lutGirderXTrans.txt")
lut = np.genfromtxt(fn, delimiter="\t", names=True)
d = {1: "100nm", 2: "100nm", 3: "150nm", 4: "200nm"}
print(gratingType)
grtStr = d[gratingType]
girderX = np.interp(energy, lut["Energy"], lut[grtStr])
print("girderX:%g" % (girderX))
pv = self.getPv("PSSS059:MOTOR_X2")
pv.putw(girderX)
if __name__ == "__main__":
from optparse import OptionParser, IndentedHelpFormatter
class MyFormatter(IndentedHelpFormatter):
"helper class for formating the OptionParser"
def __init__(self):
IndentedHelpFormatter.__init__(self)
def format_epilog(self, epilog):
if epilog:
return epilog
else:
return ""
def parse_args():
"main command line interpreter function"
(h, t) = os.path.split(sys.argv[0])
cmd = "\n " + (t if len(h) > 3 else sys.argv[0]) + " "
exampleCmd = ("MYPREFIX",)
epilog = (
__doc__
+ """
Examples:"""
+ "".join(map(lambda s: cmd + s, exampleCmd))
+ "\n "
)
fmt = MyFormatter()
parser = OptionParser(epilog=epilog, formatter=fmt)
parser.add_option("-m", "--mode", type="int", help="move instead of homing", default=0)
parser.add_option("-s", "--stdout", action="store_true", help="log to stdout instead of file")
(args, other) = parser.parse_args()
# print(args,other)
args.var = other
# args.var=('SARFE10-',)
fn = (
"/afs/psi.ch/intranet/Controls/scratch/"
+ os.path.basename(__file__)
+ "_"
+ os.environ.get("USER")
+ ".log"
)
if not args.stdout:
print("output redirected to file:\n" + fn)
stdoutBak = sys.stdout
sys.stdout = open(fn, "a+")
print("*" * 30 + "\n" + time.ctime() + ": run on host:" + os.environ.get("HOSTNAME"))
print("Args:" + str(args) + " " + str(args.var))
sys.stdout.flush()
psss = PSSS(args)
if args.mode == 0:
psss.homing()
elif args.mode == 1:
psss.set_energy_motor(energy2motor=True)
elif args.mode == 2:
psss.set_energy_motor(energy2motor=False)
elif args.mode == 3:
psss.grating2motor()
elif args.mode == 4:
psss.set_energy_motor(energy2motor=True, scan=True)
elif args.mode == 5:
psss.set_energy_motor(energy2motor=True, scan=True, rotWait=True)
print("PSSS_motion done.")
return
# os.environ['EPICS_CA_ADDR_LIST']='localhost'
# os.environ['EPICS_CA_ADDR_LIST']='172.26.0.255 172.26.2.255 172.26.8.255 172.26.16.255 172.26.24.255 172.26.32.255 172.26.40.255 172.26.110.255 172.26.111.255 172.26.120.255 129.129.242.255 129.129.243.255'
parse_args()

85
beamline/bernina_mono.py Normal file
View File

@ -0,0 +1,85 @@
from types import SimpleNamespace
from time import sleep
import numpy as np
from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable
from slic.core.device import Device
from slic.utils.hastyepics import get_pv as PV
from slic.devices.general.motor import Motor
class BerninaMono(Device):
def __init__(self, ID, name="Bernina DCM", **kwargs):
super().__init__(ID, name=name, **kwargs)
self.theta = Motor(ID + ":RX12")
self.x = Motor(ID + ":TX12")
self.gap = Motor(ID + ":T2")
self.roll1 = Motor(ID + ":RZ1")
self.roll2 = Motor(ID + ":RZ2")
self.pitch2 = Motor(ID + ":RX2")
self.energy = DoubleCrystalMonoEnergy(ID, name=name)
class DoubleCrystalMonoEnergy(Adjustable):
def __init__(self, ID, name=None):
self.wait_time = 0.1
pvname_setvalue = "SAROP21-ARAMIS:ENERGY_SP"
pvname_readback = "SAROP21-ARAMIS:ENERGY"
pvname_moving = "SAROP21-ODCM098:MOVING"
pvname_stop = "SAROP21-ODCM098:STOP.PROC"
pv_setvalue = PV(pvname_setvalue)
pv_readback = PV(pvname_readback)
pv_moving = PV(pvname_moving)
pv_stop = PV(pvname_stop)
units = pv_readback.units
super().__init__(ID, name=name, units=units)
self.pvnames = SimpleNamespace(
setvalue = pvname_setvalue,
readback = pvname_readback,
moving = pvname_moving,
stop = pvname_stop
)
self.pvs = SimpleNamespace(
setvalue = pv_setvalue,
readback = pv_readback,
moving = pv_moving,
stop = pv_stop
)
def get_current_value(self):
return self.pvs.readback.get()
def set_current_value(self, value):
self.pvs.setvalue.put(value)
sleep(3)
def set_target_value(self, value):
self.set_current_value(value)
# while abs(self.wait_for_valid_value() - value) > accuracy:
while self.is_moving():
sleep(self.wait_time)
def wait_for_valid_value(self):
val = np.nan
while not np.isfinite(val):
val = self.get_current_value()
return val
def is_moving(self):
moving = self.pvs.moving.get()
return bool(moving)
def stop(self):
self.pvs.stop.put(1)

400
beamline/undulator.py Normal file
View File

@ -0,0 +1,400 @@
from time import sleep
import numpy as np
from epics import PV
from logzero import logger as log
from slic.core.adjustable import Adjustable, PVAdjustable, PVEnumAdjustable
from slic.core.scanner.scanbackend import wait_for_all # , stop_all
import subprocess
UND_NAME_FMT = "SARUN{:02}-UIND030"
N_UND_CHIC = None
N_UNDS = list(range(3, 15 + 1))
# N_UNDS.remove(N_UND_CHIC) # does not exist
# from Alvra mono calibration
# energy_offset = 20.37839
# Cristallina without calibration
energy_offset = 0
# move the PSSS motor according to the energy
# TODO: improve this hack
PSSS_MOVE = True
def set_PSSS_energy(energy: float):
"""When scanning the energy with the undulator we
adjust the spectrometer to follow.
"""
energy = energy - energy_offset
print(f"Adjusting PSSS to {energy}")
PSSS_energy_PV_name = "SARFE10-PSSS059:ENERGY"
PSSS_energy_PV = PV(PSSS_energy_PV_name)
PSSS_energy_PV.put(energy, wait=True)
ret = subprocess.run(["python", "/sf/photo/src/PSSS_motor/qt/PSSS_motion.py", "-s", "-m5", "SARFE10-PSSS059"])
sleep(2)
if ret.returncode != 0:
log.warning("WARNING: PSSS adjustment failed.")
else:
print("Finished adjusting PSSS.")
class Undulators(Adjustable):
"""
for n_und_ref=None (default), the reference undulator currently used by the machine will be used
"""
def __init__(
self, n_unds=N_UNDS, n_und_ref=None, scaled=True, ID="ARAMIS_UNDULATORS", name="Aramis Undulators", units="eV"
):
# # don't allow setting these since there's no chic :)
# chic_fudge_offset = 0
# adjust_chic = False
super().__init__(ID, name=name, units=units)
machine_n_und_ref = get_machine_n_und_ref()
if n_und_ref is None:
if machine_n_und_ref is None:
raise ValueError(
f"could not read reference undulator currently used by the machine, please specify n_und_ref"
)
n_und_ref = machine_n_und_ref
if n_und_ref != machine_n_und_ref:
log.warning(
f"the chosen reference undulator ({n_und_ref}) is not the reference undulator currently used by the machine ({machine_n_und_ref})"
)
n_unds = list(n_unds)
if N_UND_CHIC in n_unds:
log.warning(
f"the CHIC ({N_UND_CHIC}) is in the list of active undulators: {n_unds}, and will be ignored/removed"
)
n_unds.remove(N_UND_CHIC)
if n_und_ref not in n_unds:
raise ValueError(f"the reference undulator ({n_und_ref}) is not in the list of active undulators: {n_unds}")
self.n_unds = n_unds
self.n_und_ref = n_und_ref
self.und_names = und_names = [UND_NAME_FMT.format(n) for n in n_unds]
self.und_name_cal = und_name_cal = UND_NAME_FMT.format(n_und_ref)
self.adjs = {name: Undulator(name) for name in und_names}
# self.chic = CHIC(chic_fudge_offset, name, units)
# self.phases = Phases(n_unds)
# self.adjust_chic = adjust_chic
self.scaled = scaled
self.convert = ConverterEK()
a = self.adjs[und_name_cal]
self.scale = ScalerEK(a)
def set_target_value(self, value, hold=False):
value = value + energy_offset
k = self.convert.K(value)
if np.isnan(k):
print("K is nan for", value)
return
print(f"{k} <- {value}")
ks_current = [a.get_current_value(readback=False) for a in self.adjs.values()]
if self.scaled:
header = "scaled: "
ks_target = self.scale.K(value, ks_current)
else:
header = "all equal:"
ks_target = k * np.ones_like(ks_current)
print(header, ks_target)
print()
def change():
# TODO: replace by set_all_target_values_and_wait when print not needed anymore
tasks = []
for (name, a), k_old, k_new in zip(self.adjs.items(), ks_current, ks_target):
delta = k_old - k_new
print(f"{name}: {k_old}\t->\t{k_new}\t({delta})")
if np.isnan(k_new):
print(f"{name} skipped since target K is nan")
continue
t = a.set_target_value(k_new, hold=False)
tasks.append(t)
wait_for_all(tasks)
# # make sure new K values have been written TODO: needed?
# sleep(2) # check if this can be shortened back to 0.5
# # switching on radial motors ...
# wait_for_all([
# a.adj_radial_on.set_target_value(1, hold=False) for a in self.adjs.values()
# ])
# # press a few times
# for _ in range(3):
# # ... and pushing go to ensure proper movements
# wait_for_all([
# a.adj_radial_go.set_target_value(1, hold=False) for a in self.adjs.values()
# ])
# sleep(0.2)
# # make sure the undulators finished moving TODO: needed?
# sleep(5)
# if self.adjust_chic:
# print("CHIC adjustment follows")
## self.chic.set_target_value(value, hold=False).wait() #TODO: test whether an additional sleep is needed
# self.phases.set(value)
# print("CHIC adjustment done")
# else:
# print("CHIC adjustment skipped")
# make sure the undulators and phases finished moving TODO: needed?
sleep(5)
if not PSSS_MOVE:
print("no PSSS movement enabled")
else:
set_PSSS_energy(value)
return self._as_task(change, hold=hold)
def get_current_value(self):
n = self.und_name_cal
a = self.adjs[n]
k = a.get_current_value()
energy = self.convert.E(k) - energy_offset
# all_ks = [a.get_current_value() for a in self.adjs.values()]
# checks = np.isclose(all_ks, k, rtol=0, atol=0.001)
# if not all(checks):
# print(f"Warning: Ks are not all close to {k}:")
# for name, k, chk in zip(self.adjs.keys(), all_ks, checks):
# if not chk:
# print(name, k)
return energy # if we need to fudge the number to match the mono, do it here!
def is_moving(self):
return any(a.is_moving() for a in self.adjs)
class Undulator(PVAdjustable):
def __init__(self, name, accuracy=0.0005):
pvname_setvalue = name + ":K_SET"
pvname_readback = pvname_setvalue # name + ":K_READ" #TODO: there are no readback values?
super().__init__(
pvname_setvalue,
pvname_readback=pvname_readback,
accuracy=accuracy,
active_move=True,
name=name,
internal=True,
)
self.adj_energy = PVAdjustable(name + ":FELPHOTENE", internal=True)
# self.adj_radial_on = PVAdjustable(name + ":RADIAL-ON", internal=True) #TODO: do not exist
# self.adj_radial_go = PVAdjustable(name + ":RADIAL-GO", internal=True) #TODO: do not exist
# self.adj_radial_on_proc = PVAdjustable(name + ":RADIAL-ON.PROC", internal=True)
# self.adj_radial_go_proc = PVAdjustable(name + ":RADIAL-GO.PROC", internal=True)
@property
def energy(self):
return self.adj_energy.get_current_value() * 1000
class ConverterEK:
h = 4.135667696e-15 # eV * s
c = 299792458 # m / s
lambda_u = 15e-3 # m
const = 2 * h * c / lambda_u # eV
electron_rest_energy = 0.51099895 # MeV
# was: pvname_electron_energy="SATCL01-MBND100:P-READ"
# was: pvname_electron_energy="SATCB01:ENE-FILT-OP"
def __init__(self, pvname_electron_energy="SARCL02-MBND100:P-READ"): # S30CB13:ENE-FILT-OP
self.pv_electron_energy = PV(pvname_electron_energy)
def K(self, energy):
f = self.get_factor()
v = f / energy - 1
return np.sqrt(2 * v)
def E(self, k_value):
f = self.get_factor()
v = 1 + k_value**2 / 2
return f / v
def get_factor(self):
return self.const * self.get_gamma_squared()
def get_gamma_squared(self):
electron_energy = self.pv_electron_energy.get()
gamma = electron_energy / self.electron_rest_energy
return gamma**2
class ScalerEK:
def __init__(self, und_reference):
self.und = und_reference
def K(self, energy_target, K_current=None):
if K_current is None:
K_current = self.und.get_current_value()
K_current = np.asarray(K_current)
energy_current = self.und.energy
energy_ratio = energy_current / energy_target
K_target_squared = energy_ratio * (K_current**2 + 2) - 2
return np.sqrt(K_target_squared)
# class CHIC(PVAdjustable):
#
# def __init__(self, fudge_offset, name, units):
# self.fudge_offset = fudge_offset
# name += " CHIC Energy"
# super().__init__("SATUN-CHIC:PHOTON-ENERGY", name=name)
# self.pvs.start = PV("SATUN-CHIC:APPLY-DELAY-OFFSET-PHASE")
# self.units = units
#
#
# def set_target_value(self, value, hold=False):
# fudge_offset = self.fudge_offset
# print("CHIC fudge offset is", fudge_offset)
# value -= fudge_offset
# value /= 1000
#
# def change():
# sleep(1)
# print("CHIC setvalue")
# self.pvs.setvalue.put(value, wait=True)
# print("CHIC start")
# self.pvs.start.put(1, wait=True)
# #TODO: test whether an additional sleep is needed
# sleep(1)
#
# return self._as_task(change, hold=hold)
#
#
# def get_current_value(self):
# return super().get_current_value() * 1000
# class TwoColorChicane(PVAdjustable):
#
# def __init__(self, name):#, t0=0):
## self.t0 = t0
## name += " Two Color Chicane"
# super().__init__("SATUN14-MBND100:I-SET", "SATUN14-MBND100:I-READ", process_time=1, name=name)
#
## def set_target_value(self, value, hold=False):
## super().set_target_value(value)
#
## def get_current_value(self):
## return super().get_current_value()
# class Phases:
#
# def __init__(self, n_unds=N_UNDS):
# # 22 does not have a chicane
# n_unds = n_unds.copy()
# if 22 in n_unds:
# n_unds.remove(22)
#
# # 22 does not have a chicane
# n_unds_all = N_UNDS.copy()
# if 22 in n_unds_all:
# n_unds_all.remove(22)
#
# self.cb_auto_good = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in n_unds}
# self.cb_auto_bad = {i: PV(f"SATUN{i:02}-CHIC:AUTO-PHASING") for i in set(n_unds_all) - set(n_unds)}
#
# self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY")
# self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY")
#
#
# def set(self, energy):
# for cb in self.cb_auto_good.values(): cb.put(int(False))
# for cb in self.cb_auto_bad.values(): cb.put(int(False))
# sleep(0.1)
#
# pv_fixed_energy = self.pv_fixed_energy
#
# current_state = pv_fixed_energy.get()
# pv_fixed_energy.put(int(True)) # enforce fixed energy
#
# self.pv_energy.put(energy / 1000) # in keV
# sleep(0.1)
#
# for cb in self.cb_auto_good.values(): cb.put(int(True))
# sleep(0.1)
#
# for cb in self.cb_auto_good.values(): cb.put(int(False))
# for cb in self.cb_auto_bad.values(): cb.put(int(False))
# sleep(0.1)
#
## pv_fixed_energy.put(current_state) # reset to original state
# class Phases:
# def __init__(self, n_unds=N_UNDS):
# # 22 does not have a chicane
# n_unds = n_unds.copy()
# if 22 in n_unds:
# n_unds.remove(22)
# self.pv_energy = PV("SATUN-CHIC:PHOTON-ENERGY")
# self.pv_fixed_energy = PV("SATUN-CHIC:FIX_PHOTON_ENERGY")
# self.adjs_apply = {i: PVAdjustable(f"SATUN{i:02}-CHIC:APPLY-DOP", internal=True) for i in n_unds}
# def set(self, energy):
# pv_energy = self.pv_energy
# pv_fixed_energy = self.pv_fixed_energy
# current_state = pv_fixed_energy.get()
# pv_fixed_energy.put(int(True)) # enforce fixed energy
# pv_energy.put(energy / 1000) # in keV
# sleep(0.1)
# self.update()
## sleep(10)
# pv_fixed_energy.put(current_state) # reset to original state
# def update(self):
# wait_for_all([
# adj_apply.set_target_value(1) for adj_apply in self.adjs_apply.values()
# ])
def get_machine_n_und_ref():
res = PVEnumAdjustable("SARUN:REF-UND").get()
if not res.startswith("SARUN"):
return None
n = len("SARUN")
res = res[n:]
try:
res = int(res)
except ValueError:
return None
return res