################################################################################################### # Deployment specific global definitions - executed after startup.py ################################################################################################### import ntpath ##################################################################### # # # PHOENIX SPECIFIC DEFINITIONS # # ###################################################################### class PH_class(): """ Phoenix utility class in local.py Collection of soem usefull tools """ def __init__(self): self.show=True def pm(self,pp,*args): if self.show==True: print(pp,args) def help(self): print('PH.__dict__') print(self.__dict__ ) def init_ChannelDouble(self,alias,Ch_Epics,monitored=False): Ch=ChannelDouble(alias,Ch_Epics) Ch.monitored=monitored Ch.initialize() Ch.monitored=monitored Ch.initialize() return Ch def initMA(self): # initialize default channels forma beamline self.ma_exitslit_rbv=self.init_ChannelDouble('ma_exitslit_rbv','X07MA-OP-SL1SV1:TR_ISAP') #self.ma_exitslit_rbv=self.init_ChannelDouble('id_offset','X07MA-ID:ENERGY-OFFS') def WaitForID(self): while ID_DONE.read() != 'DONE': print('wait for ID') time.sleep(0.05) time.sleep(1) def waitForValue(self,chan,val,comp,timeout=10): """ waitForValue, waits until a value is reaced within workarond of unclear operation of cawait and different definiton of wait fior different device types chan.read()-val)>comp checks for class currently works for ch.psi.pshell.epics.ChannelDouble only """ self.pm(type(chan)) t0=time.time() waiting=True if type(chan) == ch.psi.pshell.epics.ChannelDouble: while (abs(chan.read()-val)>comp) and waiting : self.pm('waitin',chan.getChannelName(),chan.read(),val,waiting,time.time()-t0) time.sleep(.1) if time.time()-t0 > timeout: waiting=False #endif #endwhile #endif #if type(chan)==ch.psi.pshell.epics.ChannelInteger: # while chan.read() == val: # self.pm('still waiting',chan.read(),) # time.sleep(.1) # #endwhile ##endif self.pm('DONE') return # initit class PH=PH_class() PH.initMA() # use only if PHOENIX II is operated # define some stadard channels ID_OFFSET=PH.init_ChannelDouble('ID_OFFSET','X07MA-ID:ENERGY-OFFS') ID_ENERGY=PH.init_ChannelDouble('ID_ENERGY','X07MA-ID:ENERGY-READ') ID_MODE=ChannelString('ID_MODE','X07MA-ID:MODE') ID_MODE.initialize() ID_DONE=ChannelString('ID_DONE','X07MA-ID:DONE') ID_DONE.initialize() ID_ALPHA=ChannelString('ID_ALPHA','X07MA-ID:ALPHA') ID_ALPHA.initialize() ma_exitslit_rbv=PH.init_ChannelDouble('ma_exitslit_rbv','X07MA-OP-SL1SV1:TR_ISAP') ma_mono_m1_rbv=PH.init_ChannelDouble('ma_mono_m1_rbv','X07MA:m1.RBV') ma_mono_m2_rbv=PH.init_ChannelDouble('ma_mono_m2_rbv','X07MA:m2.RBV') ma_mono_m2_rbv=PH.init_ChannelDouble('ma_mono_m2_rbv','X07MA:m2.RBV') ES3_MF1_PRESSURE=PH.init_ChannelDouble('ES3_MF1_PRESSURE','X07MB-ES3-MF1:PRESSURE') ################ DEFINITIONS FOR OTF NO_BEAM_CHECK = False ABORT_ON_ID_ERROR = False #Device initialization ma_energy.setBlockingWrite(True) id_pol_mode.setpoint.setBlockingWrite(True) id_pol_offset.setBlockingWrite(True) id_pol_angle.setBlockingWrite(True) #Pseudo-devices dev=ChannelDouble('Example','X07MB-ES-MA1:TRZ1.RBV') dev.monitored=True add_device(dev, True) def otf(start, end, duration, delay=0.0, mode = None, offset = None, alpha = None, name = None): """ """ print('in otf',name) print('start, end,duration',start, end, duration) if name is None: name = get_exec_pars().name if len(name)> 38: name = name[:38] print('WARNING: Sample name too long. Name has been truncated.') print('in otf, call run \n') time.sleep(.5) run("EnergyScan_ma", {"E1":start, "E2":end, "TIME":duration, "DELAY":float(delay), "MODE":mode, "OFFSET":(offset), "NAME":name, "ALPHA":float(alpha) if alpha is not None else None}) print('in otf, after run \n') def has_beam(): """ """ return sls_status.readback.read() !="Machine Down" def wait_beam(): """ """ if not has_beam(): print "Waiting for beam... " if NO_BEAM_CHECK: print "Maintenence mode: disregarding beam state" return while not has_beam(): sleep(0.1) print "Beam OK." def wait_pol_done(delay=1.0): """ """ print "Waiting for pol done" time.sleep(delay) #Make sure value changed while True: try: if caget("X07MA-ID:DONE") == "DONE": break except: print "Error reading pol done" time.sleep(0.5) print "Done" def wait_channel(name, value, timeout=None, type=None, comparator=None): print "Waiting " + str(name) + " = " + str(value),"... " cawait(name, value, timeout = timeout, type=type, comparator=comparator) print "waiting done" def is_id_error(): return (id_error.read()==0) def check_id_error(): if is_id_error(): raise Exception ("ID error: check ID status") def wait_id_ok(): """ """ if is_id_error(): print "ID error: waiting..." while True: time.sleep(1.0) #Make sure value changed try: if not is_id_error(): break except: print "Error reading id error" print "Done" def set_pol(mode=None, alpha=None, offset=None): if mode is not None: print "Set pol mode: ", mode id_pol_mode.write(mode) if MODE == 'LINEAR': if alpha is not None: print "Set pol alpha: ", alpha id_pol_angle.write(alpha) wait_pol_done(1.0) if offset is not None: print "Set pol offset: ", offset id_pol_offset.write(offset) def set_energy_ma(value): print "Setting energy ma to: ", value ma_energy.write(float(E1)) # no need to add wait command. This commands sets and waits. print "Done" ################################################################################################### #Default scan callbacks ################################################################################################### def before_sample(position, scan): pass def after_sample(record=None, scan=None): if ABORT_ON_ID_ERROR: check_id_error() return True if is_id_error(): if (record is not None): record.invalidate() print "ID error, waiting..." while is_id_error(): time.sleep(1.0) print "ID OK" return False return True