185 lines
5.8 KiB
Python
Executable File
185 lines
5.8 KiB
Python
Executable File
from time import sleep
|
|
|
|
from slic.core.adjustable import Adjustable, PVAdjustable
|
|
from slic.core.device import Device, SimpleDevice
|
|
from slic.devices.general.motor import Motor
|
|
from slic.utils.hastyepics import get_pv as PV
|
|
|
|
|
|
|
|
class Attenuator(Device):
|
|
|
|
def __init__(self, ID, energy_threshold=1500, name="Attenuator Aramis", process_time=1, **kwargs):
|
|
super().__init__(ID, name=name, **kwargs)
|
|
|
|
|
|
# self.motor_limits = motor_limits
|
|
|
|
# self.motors = motors = LimitedMotors(ID, motor_limits)
|
|
self.motor_status_pvs = [PVAdjustable(f"{ID}:MOTOR_{i}.DMOV") for i in [1,2,3,4,5,6]]
|
|
|
|
# self.motors = LimitedMotors(ID, motor_limits)
|
|
|
|
self.trans1st = Transmission(ID, process_time=process_time)
|
|
self.trans3rd = Transmission(ID, process_time=process_time, third_order=True)
|
|
self.energy = LimitedEnergy(ID, energy_threshold, process_time)
|
|
self.foils = [Foil(ID, i+1) for i in range(6)]
|
|
|
|
|
|
def get_current_transmission(self):
|
|
return self.trans1st.get_current_value()
|
|
|
|
def get_current_transmission_third_harmonic(self):
|
|
return self.trans3rd.get_current_value()
|
|
|
|
def set_transmission(self, value, energy=None):
|
|
self.energy.set_target_value(energy)
|
|
self.trans1st.set_target_value(value)
|
|
|
|
def set_transmission_third_harmonic(self, value, energy=None):
|
|
self.energy.set_target_value(energy)
|
|
self.trans3rd.set_target_value(value)
|
|
|
|
|
|
def set_foils_combination(self, combination: list, wait=False):
|
|
assert len(combination) == 6, "Invalid combination entered"
|
|
|
|
for value in combination:
|
|
assert value in self.foils[0].values, "wrong value entered"
|
|
|
|
|
|
for i, (foil,value) in enumerate(zip(self.foils,combination)):
|
|
print(f"setting {repr(foil)} to position {value}")
|
|
foil.set_target_value(value)
|
|
|
|
if wait:
|
|
self.foils[0].wait_for_motion()
|
|
|
|
|
|
|
|
|
|
|
|
class Transmission(PVAdjustable):
|
|
|
|
def __init__(self, ID, third_order=False, check_motors=True, **kwargs):
|
|
#self.motors = motors
|
|
self.third_order = third_order
|
|
self.motors = [Motor(f"{ID}:MOTOR_{i}") for i in [1,2,3,4,5,6]]
|
|
|
|
self.check_motors = check_motors
|
|
|
|
prefix = ID + ":"
|
|
# Changed for new attenuator IOC
|
|
pvname_setvalue = prefix + "UsrRec.TD"
|
|
pvname_readback = prefix + ("UsrRec.TC3" if third_order else "UsrRec.TC1")
|
|
super().__init__(pvname_setvalue, pvname_readback, **kwargs)
|
|
|
|
self.pvnames.third_order_toggle = pvn = prefix + "UsrRec.HRM3"
|
|
self.pvs.third_order_toggle = PV(pvn)
|
|
|
|
|
|
pv_calculated = prefix + ("UsrRec.TR3" if third_order else "UsrRec.TR1")
|
|
pv_current = prefix + ("UsrRec.TC3" if third_order else "UsrRec.TC1")
|
|
|
|
self.calculated_trans = PVAdjustable(pv_calculated).get_current_value()
|
|
self.current_trans = PVAdjustable(pv_current).get_current_value()
|
|
|
|
def set_target_value(self, *args, **kwargs):
|
|
self.set_third_order_toggle()
|
|
t = super().set_target_value(*args, **kwargs)
|
|
self.wait_for_motion()
|
|
return t
|
|
|
|
def set_third_order_toggle(self):
|
|
self.pvs.third_order_toggle.put(self.third_order)
|
|
|
|
def wait_for_motion(self):
|
|
sleep(1) # to be certain that motion has started
|
|
while True:
|
|
if not self.is_moving():
|
|
break # motion is completed
|
|
sleep(0.25)
|
|
|
|
def is_moving(self):
|
|
if self.check_motors:
|
|
return any(m.is_moving() for m in self.motors)
|
|
|
|
else:
|
|
return (self.calculated_trans != self.current_trans)
|
|
|
|
|
|
|
|
class LimitedEnergy(PVAdjustable):
|
|
|
|
def __init__(self, ID, threshold, wait_time):
|
|
self.threshold = threshold
|
|
self.wait_time = wait_time
|
|
|
|
super().__init__(ID + ":UsrRec.ERY")
|
|
|
|
prefix = ID + ":"
|
|
self.pvnames.fel_energy = pvn = prefix + "ENY_CALC" # uses the new calculated energy from attenuator IOC
|
|
self.pvs.fel_energy = PV(pvn)
|
|
|
|
|
|
def set_target_value(self, value):
|
|
threshold = self.threshold
|
|
wait_time = self.wait_time
|
|
|
|
while not value:
|
|
value = self.pvs.fel_energy.get()
|
|
if value is not None:
|
|
if value >= threshold:
|
|
break
|
|
|
|
print(f"Machine photon energy ({value} eV) is below {threshold} eV - waiting for the machine to recover...")
|
|
value = None
|
|
sleep(wait_time)
|
|
|
|
super().set_target_value(value) # setting the energy is now instantanious, motors will still move afterwards.
|
|
print(f"Set attenuator energy to {value} eV")
|
|
|
|
|
|
class Foil(PVAdjustable):
|
|
|
|
def __init__(self, ID, number,check_motors=True):
|
|
""" Foil with possible settings of 0,1,2,3 or 4.
|
|
"""
|
|
|
|
assert 0 <= int(number) <= 6, "Invalid foil selected"
|
|
super().__init__(ID + f":UsrRec.FI{number}")
|
|
|
|
self.values = [0, 1, 2, 3, 4]
|
|
|
|
prefix = ID + ":"
|
|
|
|
self.motors = [Motor(f"{ID}:MOTOR_{i}") for i in [1,2,3,4,5,6]]
|
|
self.check_motors = check_motors
|
|
|
|
pv_calculated = prefix + "UsrRec.TR1"
|
|
pv_current = prefix + "UsrRec.TC1"
|
|
|
|
self.calculated_trans = PVAdjustable(pv_calculated).get_current_value()
|
|
self.current_trans = PVAdjustable(pv_current).get_current_value()
|
|
|
|
def set_target_value(self, *args, **kwargs):
|
|
t = super().set_target_value(*args, **kwargs).wait()
|
|
self.wait_for_motion()
|
|
return t
|
|
|
|
def wait_for_motion(self):
|
|
sleep(1) # to be certain that motion has started
|
|
while True:
|
|
if not self.is_moving():
|
|
break # motion is completed
|
|
sleep(0.25)
|
|
|
|
def is_moving(self):
|
|
if self.check_motors:
|
|
return any(m.is_moving() for m in self.motors)
|
|
|
|
else:
|
|
return (self.calculated_trans != self.current_trans)
|
|
|
|
|