Files
x07mb/script/local.py
2026-03-02 13:49:55 +01:00

264 lines
7.2 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
import ntpath
#####################################################################
#
#
# PHOENIX SPECIFIC DEFINITIONS
#
#
######################################################################
class PH_class():
"""
Phoenix utility class in local.py
Collection of soem usefull tools
"""
def __init__(self):
self.show=True
def pm(self,pp,*args):
if self.show==True:
print(pp,args)
def help(self):
print('PH.__dict__')
print(self.__dict__ )
def init_ChannelDouble(self,alias,Ch_Epics,monitored=False):
Ch=ChannelDouble(alias,Ch_Epics)
Ch.monitored=monitored
Ch.initialize()
Ch.monitored=monitored
Ch.initialize()
return Ch
def initMA(self):
# initialize default channels forma beamline
self.ma_exitslit_rbv=self.init_ChannelDouble('ma_exitslit_rbv','X07MA-OP-SL1SV1:TR_ISAP')
#self.ma_exitslit_rbv=self.init_ChannelDouble('id_offset','X07MA-ID:ENERGY-OFFS')
def WaitForID(self):
while ID_DONE.read() != 'DONE':
print('wait for ID')
time.sleep(0.05)
time.sleep(1)
def waitForValue(self,chan,val,comp,timeout=10):
"""
waitForValue, waits until a value is reaced within
workarond of unclear operation of cawait and different definiton
of wait fior different device types
chan.read()-val)>comp
checks for class
currently works for ch.psi.pshell.epics.ChannelDouble only
"""
self.pm(type(chan))
t0=time.time()
waiting=True
if type(chan) == ch.psi.pshell.epics.ChannelDouble:
while (abs(chan.read()-val)>comp) and waiting :
self.pm('waitin',chan.getChannelName(),chan.read(),val,waiting,time.time()-t0)
time.sleep(.1)
if time.time()-t0 > timeout:
waiting=False
#endif
#endwhile
#endif
#if type(chan)==ch.psi.pshell.epics.ChannelInteger:
# while chan.read() == val:
# self.pm('still waiting',chan.read(),)
# time.sleep(.1)
# #endwhile
##endif
self.pm('DONE')
return
# initit class
PH=PH_class()
PH.initMA() # use only if PHOENIX II is operated
# define some stadard channels
ID_OFFSET=PH.init_ChannelDouble('ID_OFFSET','X07MA-ID:ENERGY-OFFS')
ID_ENERGY=PH.init_ChannelDouble('ID_ENERGY','X07MA-ID:ENERGY-READ')
ID_MODE=ChannelString('ID_MODE','X07MA-ID:MODE')
ID_MODE.initialize()
ID_DONE=ChannelString('ID_DONE','X07MA-ID:DONE')
ID_DONE.initialize()
ID_ALPHA=ChannelString('ID_ALPHA','X07MA-ID:ALPHA')
ID_ALPHA.initialize()
ma_exitslit_rbv=PH.init_ChannelDouble('ma_exitslit_rbv','X07MA-OP-SL1SV1:TR_ISAP')
ma_mono_m1_rbv=PH.init_ChannelDouble('ma_mono_m1_rbv','X07MA:m1.RBV')
ma_mono_m2_rbv=PH.init_ChannelDouble('ma_mono_m2_rbv','X07MA:m2.RBV')
ma_mono_m2_rbv=PH.init_ChannelDouble('ma_mono_m2_rbv','X07MA:m2.RBV')
ES3_MF1_PRESSURE=PH.init_ChannelDouble('ES3_MF1_PRESSURE','X07MB-ES3-MF1:PRESSURE')
################ DEFINITIONS FOR OTF
NO_BEAM_CHECK = False
ABORT_ON_ID_ERROR = False
#Device initialization
ma_energy.setBlockingWrite(True)
id_pol_mode.setpoint.setBlockingWrite(True)
id_pol_offset.setBlockingWrite(True)
id_pol_angle.setBlockingWrite(True)
#Pseudo-devices
dev=ChannelDouble('Example','X07MB-ES-MA1:TRZ1.RBV')
dev.monitored=True
add_device(dev, True)
def otf(start, end, duration, delay=0.0, mode = None, offset = None, alpha = None, name = None):
"""
"""
print('in otf',name)
print('start, end,duration',start, end, duration)
if name is None:
name = get_exec_pars().name
if len(name)> 38:
name = name[:38]
print('WARNING: Sample name too long. Name has been truncated.')
print('in otf, call run \n')
time.sleep(.5)
run("EnergyScan_ma", {"E1":start, "E2":end, "TIME":duration, "DELAY":float(delay), "MODE":mode, "OFFSET":(offset), "NAME":name, "ALPHA":float(alpha) if alpha is not None else None})
print('in otf, after run \n')
def has_beam():
"""
"""
return sls_status.readback.read() !="Machine Down"
def wait_beam():
"""
"""
if not has_beam():
print "Waiting for beam... "
if NO_BEAM_CHECK:
print "Maintenence mode: disregarding beam state"
return
while not has_beam():
sleep(0.1)
print "Beam OK."
def wait_pol_done(delay=1.0):
"""
"""
print "Waiting for pol done"
time.sleep(delay) #Make sure value changed
while True:
try:
if caget("X07MA-ID:DONE") == "DONE":
break
except:
print "Error reading pol done"
time.sleep(0.5)
print "Done"
def wait_channel(name, value, timeout=None, type=None, comparator=None):
print "Waiting " + str(name) + " = " + str(value),"... "
cawait(name, value, timeout = timeout, type=type, comparator=comparator)
print "waiting done"
def is_id_error():
return (id_error.read()==0)
def check_id_error():
if is_id_error():
raise Exception ("ID error: check ID status")
def wait_id_ok():
"""
"""
if is_id_error():
print "ID error: waiting..."
while True:
time.sleep(1.0) #Make sure value changed
try:
if not is_id_error():
break
except:
print "Error reading id error"
print "Done"
def set_pol(mode=None, alpha=None, offset=None):
if mode is not None:
print "Set pol mode: ", mode
id_pol_mode.write(mode)
if MODE == 'LINEAR':
if alpha is not None:
print "Set pol alpha: ", alpha
id_pol_angle.write(alpha)
wait_pol_done(1.0)
if offset is not None:
print "Set pol offset: ", offset
id_pol_offset.write(offset)
def set_energy_ma(value):
print "Setting energy ma to: ", value
ma_energy.write(float(E1)) # no need to add wait command. This commands sets and waits.
print "Done"
###################################################################################################
#Default scan callbacks
###################################################################################################
def before_sample(position, scan):
pass
def after_sample(record=None, scan=None):
if ABORT_ON_ID_ERROR:
check_id_error()
return True
if is_id_error():
if (record is not None):
record.invalidate()
print "ID error, waiting..."
while is_id_error():
time.sleep(1.0)
print "ID OK"
return False
return True