minor changes

This commit is contained in:
gac-x07mb
2024-12-20 15:48:06 +01:00
parent c888c22da2
commit 03789c34f4
11 changed files with 1389 additions and 49 deletions

View File

@ -25,6 +25,7 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
)
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
@ -56,6 +57,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
"""
def on_stage(self) -> None:
"""
@ -93,28 +95,41 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
# check for falcon
falcon = self.parent.device_manager.devices.get("falcon", None) # get device
timeout = 1
timeout = 2
if falcon is not None:
# TODO Check that falcon.state.get() == 1 is the correct check.
# TODO Check that falcon.aquiring.get() == 1 is the correct check.
# --> When is the falcon acquiring, this assumes 1?
# self.wait_for_signals is defined in PSI_detector_base.CustomDetectorMixin
# ALSO check for enabled or not here!!!
#
if not self.wait_for_signals([(falcon.state.get, 1)], timeout=timeout):
if not self.wait_for_signals([(falcon.aquiring.get, 1)], timeout=timeout):
raise PhoenixTriggerError(
f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s"
)
xmap = self.parent.device_manager.devices.get("xmap", None) # get device
timeout = 1
timeout = 2
if xmap is not None:
# TODO Check that falcon.state.get() == 1 is the correct check.
# TODO Check that falcon.aquiring.get() == 1 is the correct check.
# --> When is the falcon acquiring, this assumes 1?
# self.wait_for_signals is defined in PSI_detector_base.CustomDetectorMixin
#
if not self.wait_for_signals([(xmap.state.get, 1)], timeout=timeout):
print('xmap erase start')
xmap.erase_start.set(1) # .. should be in phoenix_xmap?
print('xmap erase start')
print(xmap.acquiring.get())
time.sleep(0.1)
print(xmap.acquiring.get())
if not self.wait_for_signals([(xmap.acquiring.get, 1)], timeout=timeout):
raise PhoenixTriggerError(
f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for xmap to get ready. Timeout after {timeout}s"
)
@ -215,6 +230,8 @@ class PhoenixTrigger(PSIDetectorBase):
#
# (Here PhoenixTrigger <-- PSIDetectorBase <- Device
#
# intr_count = Cpt(
# EpicsSignal, "INTR-COUN
# then Cpt will construct - magically- the
# Epics channel name = prefix+suffix,
#

View File

@ -7,6 +7,7 @@
import enum
import os
import threading
import time
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
@ -70,6 +71,7 @@ class XMAPSetup(CustomDetectorMixin):
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
@ -147,15 +149,17 @@ class XMAPSetup(CustomDetectorMixin):
Prepare detector and backend for acquisition
"""
# staging for XMAP as used in FDA
# staging for XMAP as used in FDA this is for step scan
# make sure that detector is ready to measure
self.parent.stop_all.set(1)
time.sleep(0.05)
self.parent.collect_mode.set(0)
time.sleep(0.05)
self.parent.apply(0)
self.parent.apply.set(0)
time.sleep(0.05)
self.parent.preset_real.set(0)
self.parent.preset_mode.set(0)
time.sleep(0.05)
# self.prepare_detector()
@ -327,18 +331,25 @@ class XMAPPhoenix(PSIDetectorBase, xMAP):
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
dxp1 = Cpt(EpicsDXP, "dxp1:")
# for fast loading uncomment dxp and mca for now
# dxp1 = Cpt(EpicsDXP, "dxp1:")
# dxp2 = Cpt(EpicsDXP, "dxp2:")
# dxp3 = Cpt(EpicsDXP, "dxp3:")
# dxp4 = Cpt(EpicsDXP, "dxp4:")
mca1 = Cpt(EpicsMCARecord, "mca1")
# mca1 = Cpt(EpicsMCARecord, "mca1")
# mca2 = Cpt(EpicsMCARecord, "mca2")
# mca3 = Cpt(EpicsMCARecord, "mca3")
# mca4 = Cpt(EpicsMCARecord, "mca4")
hdf5 = Cpt(HDF5Plugin, "HDF1:")
# for compatibility and mapping ruke EPICS --> bec
preset_real = xMAP.preset_real_time
preset_live = xMAP.preset_live_time
# create some easy to find names for frequently used channels
ph_erase_start = xMAP.erase_start

View File

@ -2,48 +2,14 @@
# creates newly
ff = 0W
ff = 0
falcon = 0
from ophyd import Component as Cpt
import phoenix_bec.devices.phoenix_xmap as ff
xmap = ff.XMAPPhoenix(name="xmap", prefix="X07MB-XMAP:")
# xmap = ff.FalconPhoenix(name="falcon_hdf5", prefix="X07MB-XMAP:")
# make a 'get to read all epics channels
# there will be an error message, if device contains a channel whcih does not exist
#w = xmap.read()
#HOw to test function in mixing layer
# phoenix_bec / local_scripts / Code_to_test_devices / test_falcon.py
# for attr in falcon.hdf5.component_names:
# if not attr.startswith("_"):
# print(attr)
# signal = getattr(falcon, attr)
# signal.get()
"""
print(this_scan.scan.data.keys())
for outer_key in this_scan.scan.data.keys():
print("outer_key", outer_key)
n_outer = len(this_scan.scan.data.keys())
for inner_key in this_scan.scan.data[outer_key].keys():
print("inner_key", inner_key)
# calculate nunber of points
n_inner = len(this_scan.scan.data[outer_key][inner_key].keys())
value = np.zeros(n_inner)
timestamp = np.zeros(n_inner)
for i in range(n_inner):
try:
value[i] = this_scan.scan.data[outer_key][inner_key][i]["value"]
except:
value = None
try:
timestamp[i] = this_scan.scan.data[outer_key][inner_key][i]["timestamp"]
except:
timestamp[i] = None
# endfor
self.add(inner_key + "_" + outer_key + "_val", value)
self.add(innerprint("test")
"""
xmap.custom_prepare.on_stage()

View File

@ -0,0 +1,522 @@
#! /usr/bin/env python
#
#
# old python library for pure python aplicatoins
#
# ===========================================================================
import os
import sys
import numpy as np
#from CaChannel import *
import epics as ep
#from EpicsMotor import *
#from epicsPV import *
import time
import string
def init_PV():
#
# routine initialzed all EPIC channels used in the Phoenix beamline
#
global PV
PV = {} # define EPICS CHANNEL (PV) directory
# define one channel
try:
PV['X07MB_MB_ScanX_VAL'] = ep.PV('X07MB-MA1_ScanX.VAL')
except:
print( "*** Error finding EPICS channel: ")
sys.exit("*** Program STOP ***")
print ('init PV done')
def get_epicsPV(channel):
b=-1
# first check fro channel existence
#print '.................................. enter get_epicsPV from X_X07MB_lib_dev.py' ,channel
#print('try ',channel)
try:
#print ' try reading channel X_X07MB_lib_dev.py' ,channel
b=PV[channel].getw()
except:
# channel not defined / found now created in PV list this channel into PV LIST
print ('trouble reading epics PV..',channel,'likely PV undefined')
n_tries=0
tt=0
while tt==0:
print ('in while loop ')
try:
print ('try============= ',channel)
PV[channel]=epicsPV(channel)
#print PV
tt=1
print (PV[channel].getw())
except:
#print 'set tt back to zero as something went wrong here'
tt=0
#print 'except somthing is wrong here, likely time out, now retry channels'
#print b
n_tries=n_tries+1
time.sleep(.3)
print( 'next try',n_tries)
if n_tries==25:
tt=1
print( 'too many tries for channel acces')
print( 'Channel',channel)
sys.exit("*** Program STOP ***")
# endif
# endtry
# now try again and set monitor to reduce chanel accesscalls
PV[channel].setMonitor()
b=PV[channel].getw()
# end while
# endtry
return b
def put_epicsPV(channel,val,delay={}):
print(channel,val,delay)
#try:
# from CaChannel import *
# from epicsMotor import *
# from epicsPV import *
# import time
# import string
#except:
# try:
# sys.path.insert(0, os.path.expandvars("$SLSBASE/sls/lib/python22/CaChannel"))
# sys.path.insert(0, os.path.expandvars("/exchange/share/mXAS/pyth/mod"))
# from CaChannel import *
# from epicsPV import *
# except:
# os.system ("xkbbell")
# os.system ("xmessage -nearmouse -timeout 30 \
# -buttons \"\" \"epicsPV or CaChannel module cannot be found\"")
# sys.exit(1)
# # endtry
# endtry
#print '------------------------------------------' , val
#print ' *delay) ', delay
#print ' *delay) '
#print ' locals',locals()
if delay == {}:
#print 'in if use predefined delay of 0 '
set_delay=0.0
else:
#print ' set delay to input value',delay
set_delay = delay
#endelse
n_tries=0
tt=0
b=-1
while tt==0:
try:
#print ' try writing into ' ,channel
epicsPV(channel).putw(val)
tt=1
except:
print( 'trouble reading epics PV..')
print( b)
n_tries=n_tries+1
tt=0
time.sleep(.5)
if n_tries==50:
tt=1
print ('too many tries for channel acces')
#sys.exit("*** Program STOP ***")
# endtry
# finally perform delay
time.sleep(set_delay)
return b
# ======================================================
def move_and_wait_bk(Ch_motor,value,backlash):
global phoenix_no_move
global phoenix_no_scan
# print ch_motor
if phoenix_no_move==1:
print (' ')
print ('in move_and_wait_bk ')
print ('should move motor ',Ch_motor,' to ', value)
print ('BL in no move debugging state. DO NOT MOVE MOTOR')
print (' ')
return
# move to new position with backlash correction
move_and_wait(Ch_motor,value-backlash)
move_and_wait(Ch_motor,value)
# ======================================================
def move_and_wait_old(Ch_motor,value,check_moving):
global phoenix_no_move
global phoenix_no_scan
# print 'in move and waqit',phoenix_no_move
# print Ch_motor
if phoenix_no_move==1:
print (' ')
print ('in move_and_wait ')
print ('should move motor ',Ch_motor,' to ', value)
print ('BL in no move debugging state. DO NOT MOVE MOTOR')
print (' ')
return
Ch_motor_use=Ch_motor
if (Ch_motor[len(Ch_motor)-4:len(Ch_motor)] == '.VAL'):
Ch_motor_use=Ch_motor[0:len(Ch_motor)-4]
# print Ch_motor_use
if Ch_motor_use[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if Ch_motor_use[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
Ch_motor_val = Ch_motor_use +'.VAL'
Ch_motor_done = Ch_motor_use +'.DMOV'
#print Ch_motor_val
epicsPV(Ch_motor_val).putw(value)
print( 'wait for movement of', Ch_motor_val)
while not epicsPV(Ch_motor_done).getw() :
time.sleep(0.1)
get_epicsPV(Ch_motor_val)
#print 'Motor ',Ch_motor_val,' still moving, wait '
print ( 'movement finished')
# ===========================================================
def move_and_wait(motor,value,check_moving=0):
global phoenix_no_move
global phoenix_no_scan
# print 'in move and waqit',phoenix_no_move
# print Ch_motor
if phoenix_no_move==1:
print( ' ')
print( 'in move_and_wait ')
print( 'should move motor ',Ch_motor,' to ', value)
print( 'BL in no move debugging state. DO NOT MOVE MOTOR')
print( ' ')
return
# some safety checks...
if motor.val[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if motor.val[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
put_epicsPV(motor.val,value,delay=0.2)
print ('wait for movement of', motor.val)
if check_moving == 0:
while not get_epicsPV(motor.dmov):
time.sleep(0.1)
print( get_epicsPV(motor.val))
print( 'Motor ',motor.val,' still moving, wait ')
#endwhile
else: # case with check of movement of encoders
print ('else....')
while not get_epicsPV(motor.dmov):
enc1= get_epicsPV(motor.rep)
time.sleep(0.25)
enc2= get_epicsPV(motor.rep)
diff=enc2-enc1
print( get_epicsPV(motor.val))
if (abs(diff) < 5 and (get_epicsPV(motor.dmov) == 0)) :
print ( 'suspect hanging of motor. monitor')
enc1= get_epicsPV(motor.rep)
time.sleep(0.5)
enc2= get_epicsPV(motor.rep)
diff=enc2-enc1
time.sleep(0.5)
if ((abs(diff) < 3) and (get_epicsPV(motor.dmov) == 0)):
print (' ===================================== diff',diff )
print ('difference encoder to motor too large')
print (' STOP movement')
put_epicsPV(motor.spmg,0,delay=0.1)
stop
# endif
#endif
print( 'Motor ',motor.val,' still moving, wait ')
print( 'difference subsequent encoder positions ',diff)
# endwhile
# endif
print( 'movement finished')
# =======================================================================
def Wait_for_Value(Channel,value):
while get_epicsPV(Channel) != value:
#print(Channel)
#print get_epicsPV(Channel)
time.sleep(.02)
#endwhile
def move_and_wait_2motors(mot1,mot2,val_1,val_2,check_moving=0):
global phoenix_no_move
global phoenix_no_scan
# print 'in move and waqit',phoenix_no_move
# print Ch_motor
if phoenix_no_move==1:
print( ' ')
print( 'in move_and_wait ')
print( 'should move motor ',Ch_motor,' to ', value)
print( 'BL in no move debugging state. DO NOT MOVE MOTOR')
print( ' ')
return
# some safety checks...
if mot1.val[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if mot2.val[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if mot1.val[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
if mot2.val[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
put_epicsPV(mot1.val,val_1,delay=0.2)
put_epicsPV(mot2.val,val_2,delay=0.2)
print( 'wait for movement of', mot1.val,'and ', mot2.val)
if check_moving == 0:
print (not get_epicsPV(mot1.dmov))
print (not get_epicsPV(mot2.dmov))
while ((not get_epicsPV(mot1.dmov)) or (not get_epicsPV(mot2.dmov))):
time.sleep(0.25)
#endwhile
print( get_epicsPV(mot1.val),get_epicsPV(mot2.val))
print( 'Motor ',mot1.val,mot2.val,' still moving, wait ')
else: # case with check of movement of encoders
print ( 'else....')
while (not get_epicsPV(mot1.dmov)) or (not get_epicsPV(mot2.dmov)):
#print ' in while '
# for motor 1
enc_1_a= get_epicsPV(mot1.rep)
enc_2_a= get_epicsPV(mot2.rep)
time.sleep(0.25)
enc_1_b= get_epicsPV(mot1.rep)
enc_2_b= get_epicsPV(mot2.rep)
diff_1=enc_1_b-enc_1_a
diff_2=enc_2_b-enc_2_a
#print ' encoder diffs',diff_1,diff_2
# check encoder movement for motor 1:
if (abs(diff_1) < 5 and (get_epicsPV(mot1.dmov) == 0)) :
print( 'suspect hanging of motor 1 monitor')
enc_1_a= get_epicsPV(mot1.rep)
time.sleep(0.25)
enc_1_b= get_epicsPV(mot1.rep)
diff_1=enc_1_b-enc_1_a
time.sleep(0.5)
if ((abs(diff_1) < 3) and (get_epicsPV(mot1.dmov) == 0)):
print( ' ===================================== diff',diff_1 )
print( 'difference subsequent encoder steps too small')
print( 'Motor 1 hanging???? STOP movement for safety ')
put_epicsPV(mot_1.spmg,0,delay=0.1)
stop
# endif
#endif
# Now checlk motor 2
if (abs(diff_2) < 5 and (get_epicsPV(mot2.dmov) == 0)) :
print ('suspect hanging of motor 2: monitor')
enc_2_a= get_epicsPV(mot2.rep)
time.sleep(0.25)
enc_2_b= get_epicsPV(mot2.rep)
diff_2=enc_2_b-enc_2_a
time.sleep(0.5)
if ((abs(diff_2) < 3) and (get_epicsPV(mot2.dmov) == 0)):
print( ' ===================================== diff',diff_2 )
print( 'difference subsequent encoder steps too small')
print( 'Motor 2 hanging???? STOP movement for safety ')
put_epicsPV(mot2.spmg,0,delay=0.1)
stop
# endif
#endif
print( 'Motors ',mot1.val, 'and',mot2.val,' still moving, wait ')
print( 'difference subsequent encoder positions ',diff_1, diff_2)
# endwhile
# endif
print( 'movement finished')
# =======================================================================
# Generate all names
class Motor():
#define extensions for all channels
e_val = '.VAL'
e_dval = '.DVAL'
e_rbv = '.RBV'
e_drbv = '.DRBV'
e_off ='.OFF'
e_twv = '.TWV'
e_twf = '.TWF'
e_twr = '.TWR'
e_jogr ='.JOGR' # jog rev
e_jogf ='.JOGF' # jog forw
e_jomf ='.HOMF' # home forward
e_jomr ='.HOMR' # home reverse
e_hlm ='.HLM' #High limit
e_llm ='.LLM' #lower limit
e_hls ='.HLS' #High limit
e_lls ='.LLS' #lower limit
e_dmov ='.DMOV' # moving done
e_movn ='.MOVN' # moving
e_spmg ='.SPMG' # stop pause move go (0,1,2,3)
e_ueip ='.UEIP' # encoder on / off
e_diff ='.DIFF'
e_rep = '.REP'
e_foff ='.FOFF' #FOFF (1 = Frozen / 0 = variable)
e_set = '.SET' # 0 = use 1 = set
e_mres = '.MRES'
e_eres = '.ERES'
def create_Ch(self,ch):
# this routine creates all channel names
self.val=ch+self.e_val
self.dval=ch+self.e_dval
self.rbv=ch+self.e_rbv
self.drbv=ch+self.e_drbv
self.off=ch+self.e_off
self.twv=ch+self.e_twv
self.twf=ch+self.e_twf
self.twr=ch+self.e_twr
self.jogr=ch+self.e_jogr
self.jogf=ch+self.e_jogf
self.hlm=ch+self.e_hlm
self.llm=ch+self.e_llm
self.hls=ch+self.e_hls
self.lls=ch+self.e_lls
self.dmov = ch+self.e_dmov
self.movn = ch+self.e_movn
self.spmg = ch+self.e_spmg
self.ueip = ch+self.e_ueip
self.rep = ch+self.e_rep
self.mres = ch+self.e_mres
self.eres = ch+self.e_eres
self.diff = ch+self.e_diff
self.foff = ch+self.e_foff
self.set = ch+self.e_set
# end create_Chnames
# endclass
def define_motor(ch):
g=Motor()
g.create_Ch(ch)
return g
# end define_motor
def define_motor_kb(ch):
g=Motor()
g.create_Ch(ch)
# set offset to variable as default setting
# this keeps the dval always constant
put_epicsPV(g.foff,1,delay=0.2) # offset to frozen as we do not initialize
put_epicsPV(g.set,0,delay=0.2) # make sure that coos are on use
return g
# end define_motor
def Next_Point():
sp1=np.zeros(2048)
sp2=np.zeros(2048)
sp3=np.zeros(2048)
sp4=np.zeros(2048)
d1 = 0
d2 = 0
d3 = 0
d4 = 0
#print('erasestart')
put_epicsPV('X07MB-XMAP:EraseStart',1,delay=.03)
#print('erasestart')
#time.sleep(.1)
put_epicsPV('X07MB-OP2:SMPL',1,delay=.03)
Wait_for_Value("X07MB-OP2:SMPL-DONE",1)
put_epicsPV('X07MB-XMAP:StopAll',1,delay=.03)
sp1_old=sp1
sp2_old=sp2
sp3_old=sp3
sp4_old=sp4
d1_old=d1
d2_old=d2
d3_old=d3
d4_old=d4
sp1=get_epicsPV('X07MB-XMAP:mca1.VAL')
sp2=get_epicsPV('X07MB-XMAP:mca2.VAL')
sp3=get_epicsPV('X07MB-XMAP:mca3.VAL')
sp4=get_epicsPV('X07MB-XMAP:mca4.VAL')
d1=get_epicsPV('X07MB-XMAP:mca1.DTIM')
d2=get_epicsPV('X07MB-XMAP:mca2.DTIM')
d3=get_epicsPV('X07MB-XMAP:mca3.DTIM')
d4=get_epicsPV('X07MB-XMAP:mca4.DTIM')
print('differences',max(sp1-sp1_old),max(sp2-sp2_old),max(sp3-sp3_old),max(sp4-sp4_old))
print('differences',d1-d1_old,d2-d2_old,d3-d3_old,d4-d4_old)
time.sleep(.5)
# end
init_PV()

View File

@ -0,0 +1,789 @@
#! /usr/bin/env python
#
#
# old python library for pure python aplicatoins
#
# ===========================================================================
# for now still use the old python installation .....
try:
from CaChannel import *
from epicsMotor import *
from epicsPV import *
import time
import string
except:
try:
sys.path.insert(0, os.path.expandvars("$SLSBASE/sls/lib/python22/CaChannel"))
sys.path.insert(0, os.path.expandvars("/exchange/share/mXAS/pyth/mod"))
from CaChannel import *
from epicsPV import *
except:
os.system ("xkbbell")
os.system ("xmessage -nearmouse -timeout 30 \
-buttons \"\" \"epicsPV or CaChannel module cannot be found\"")
sys.exit(1)
# endtry
#endtry
import numpy as np
def stop_if_phoenix_is_offline():
if (epicsPV('X07MB-OP-MI1:ONLINE').getw() <> 4) or ( epicsPV('X07MA-OP-CMU:ONLINE').getw() <>6 ):
print ' '
print 'PHOENIX NOT ONLINE ACCESS DENIED'
print 'Status PHOENIX ',epicsPV("X07MB-OP-MI1:ONLINE.VAL").getw()
print 'Status XTREME ',epicsPV("X07MA-OP-CMU:ONLINE.VAL").getw()
print ' '
print ' =========================================================== '
print ' '
print ' SCRIPT STOPPED '
print ' '
print ' ========================================================== '
print ' '
stop
# ======================================================
def init_PV():
#
# routine initialzed all EPIC channels used in the Phoenix beamline
#
global PV
PV = {} # define EPICS CHANNEL (PV) directory
# define one channel
try:
PV['X07MB-OP-MO:TC1'] = epicsPV('X07MB-OP-MO:TC1')
except:
print "*** Error finding EPICS channel: "
sys.exit("*** Program STOP ***")
print 'init PV done'
def get_epicsPV(channel):
b=-1
# first check fro channel existence
#print '.................................. enter get_epicsPV from X_X07MB_lib_dev.py' ,channel
#print('try ',channel)
try:
#print ' try reading channel X_X07MB_lib_dev.py' ,channel
b=PV[channel].getw()
except:
# channel not defined / found now created in PV list this channel into PV LIST
print 'trouble reading epics PV..',channel,'likely PV undefined'
n_tries=0
tt=0
while tt==0:
print 'in while loop '
try:
print 'try============= ',channel
PV[channel]=epicsPV(channel)
#print PV
tt=1
print PV[channel].getw()
except:
#print 'set tt back to zero as something went wrong here'
tt=0
#print 'except somthing is wrong here, likely time out, now retry channels'
#print b
n_tries=n_tries+1
time.sleep(.3)
print 'next try',n_tries
if n_tries==25:
tt=1
print 'too many tries for channel acces'
print 'Channel',channel
sys.exit("*** Program STOP ***")
# endif
# endtry
# now try again and set monitor to reduce chanel accesscalls
PV[channel].setMonitor()
b=PV[channel].getw()
# end while
# endtry
return b
def put_epicsPV(channel,val,delay={}):
print(channel,val,delay)
#try:
# from CaChannel import *
# from epicsMotor import *
# from epicsPV import *
# import time
# import string
#except:
# try:
# sys.path.insert(0, os.path.expandvars("$SLSBASE/sls/lib/python22/CaChannel"))
# sys.path.insert(0, os.path.expandvars("/exchange/share/mXAS/pyth/mod"))
# from CaChannel import *
# from epicsPV import *
# except:
# os.system ("xkbbell")
# os.system ("xmessage -nearmouse -timeout 30 \
# -buttons \"\" \"epicsPV or CaChannel module cannot be found\"")
# sys.exit(1)
# # endtry
# endtry
#print '------------------------------------------' , val
#print ' *delay) ', delay
#print ' *delay) '
#print ' locals',locals()
if delay == {}:
#print 'in if use predefined delay of 0 '
set_delay=0.0
else:
#print ' set delay to input value',delay
set_delay = delay
#endelse
n_tries=0
tt=0
b=-1
while tt==0:
try:
#print ' try writing into ' ,channel
epicsPV(channel).putw(val)
tt=1
except:
print 'trouble reading epics PV..'
print b
print b
n_tries=n_tries+1
tt=0
time.sleep(.5)
if n_tries==50:
tt=1
print 'too many tries for channel acces'
#sys.exit("*** Program STOP ***")
# endtry
# finally perform delay
time.sleep(set_delay)
return b
# ======================================================
def move_and_wait_bk(Ch_motor,value,backlash):
global phoenix_no_move
global phoenix_no_scan
# print ch_motor
if phoenix_no_move==1:
print ' '
print 'in move_and_wait_bk '
print 'should move motor ',Ch_motor,' to ', value
print 'BL in no move debugging state. DO NOT MOVE MOTOR'
print ' '
return
# move to new position with backlash correction
move_and_wait(Ch_motor,value-backlash)
move_and_wait(Ch_motor,value)
# ======================================================
def move_and_wait_old(Ch_motor,value,check_moving):
global phoenix_no_move
global phoenix_no_scan
# print 'in move and waqit',phoenix_no_move
# print Ch_motor
if phoenix_no_move==1:
print ' '
print 'in move_and_wait '
print 'should move motor ',Ch_motor,' to ', value
print 'BL in no move debugging state. DO NOT MOVE MOTOR'
print ' '
return
Ch_motor_use=Ch_motor
if (Ch_motor[len(Ch_motor)-4:len(Ch_motor)] == '.VAL'):
Ch_motor_use=Ch_motor[0:len(Ch_motor)-4]
# print Ch_motor_use
if Ch_motor_use[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if Ch_motor_use[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
Ch_motor_val = Ch_motor_use +'.VAL'
Ch_motor_done = Ch_motor_use +'.DMOV'
#print Ch_motor_val
epicsPV(Ch_motor_val).putw(value)
print 'wait for movement of', Ch_motor_val
while not epicsPV(Ch_motor_done).getw() :
time.sleep(0.1)
get_epicsPV(Ch_motor_val)
#print 'Motor ',Ch_motor_val,' still moving, wait '
print 'movement finished'
# ===========================================================
def move_and_wait(motor,value,check_moving=0):
global phoenix_no_move
global phoenix_no_scan
# print 'in move and waqit',phoenix_no_move
# print Ch_motor
if phoenix_no_move==1:
print ' '
print 'in move_and_wait '
print 'should move motor ',Ch_motor,' to ', value
print 'BL in no move debugging state. DO NOT MOVE MOTOR'
print ' '
return
# some safety checks...
if motor.val[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if motor.val[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
put_epicsPV(motor.val,value,delay=0.2)
print 'wait for movement of', motor.val
if check_moving == 0:
while not get_epicsPV(motor.dmov):
time.sleep(0.1)
print get_epicsPV(motor.val)
print 'Motor ',motor.val,' still moving, wait '
#endwhile
else: # case with check of movement of encoders
print 'else....'
while not get_epicsPV(motor.dmov):
print ' in while '
enc1= get_epicsPV(motor.rep)
time.sleep(0.25)
enc2= get_epicsPV(motor.rep)
diff=enc2-enc1
print get_epicsPV(motor.val)
if (abs(diff) < 5 and (get_epicsPV(motor.dmov) == 0)) :
print 'suspect hanging of motor. monitor'
enc1= get_epicsPV(motor.rep)
time.sleep(0.5)
enc2= get_epicsPV(motor.rep)
diff=enc2-enc1
time.sleep(0.5)
if ((abs(diff) < 3) and (get_epicsPV(motor.dmov) == 0)):
print ' ===================================== diff',diff
print 'difference encoder to motor too large'
print ' STOP movement'
put_epicsPV(motor.spmg,0,delay=0.1)
stop
# endif
#endif
print 'Motor ',motor.val,' still moving, wait '
print 'difference subsequent encoder positions ',diff
# endwhile
# endif
print 'movement finished'
# =======================================================================
def Wait_for_Value(Channel,value):
while get_epicsPV(Channel) <> value:
#print(Channel)
#print get_epicsPV(Channel)
time.sleep(.02)
#endwhile
def move_and_wait_2motors(mot1,mot2,val_1,val_2,check_moving=0):
global phoenix_no_move
global phoenix_no_scan
# print 'in move and waqit',phoenix_no_move
# print Ch_motor
if phoenix_no_move==1:
print ' '
print 'in move_and_wait '
print 'should move motor ',Ch_motor,' to ', value
print 'BL in no move debugging state. DO NOT MOVE MOTOR'
print ' '
return
# some safety checks...
if mot1.val[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if mot2.val[0:5] == 'X07MA':
stop_if_phoenix_is_offline()
if mot1.val[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
if mot2.val[0:11] == 'X07MB-OP-MI1':
stop_if_phoenix_is_offline()
put_epicsPV(mot1.val,val_1,delay=0.2)
put_epicsPV(mot2.val,val_2,delay=0.2)
print 'wait for movement of', mot1.val,'and ', mot2.val
if check_moving == 0:
print (not get_epicsPV(mot1.dmov))
print (not get_epicsPV(mot2.dmov))
while ((not get_epicsPV(mot1.dmov)) or (not get_epicsPV(mot2.dmov))):
time.sleep(0.25)
#endwhile
print get_epicsPV(mot1.val),get_epicsPV(mot2.val)
print 'Motor ',mot1.val,mot2.val,' still moving, wait '
else: # case with check of movement of encoders
print 'else....'
while (not get_epicsPV(mot1.dmov)) or (not get_epicsPV(mot2.dmov)):
#print ' in while '
# for motor 1
enc_1_a= get_epicsPV(mot1.rep)
enc_2_a= get_epicsPV(mot2.rep)
time.sleep(0.25)
enc_1_b= get_epicsPV(mot1.rep)
enc_2_b= get_epicsPV(mot2.rep)
diff_1=enc_1_b-enc_1_a
diff_2=enc_2_b-enc_2_a
#print ' encoder diffs',diff_1,diff_2
# check encoder movement for motor 1:
if (abs(diff_1) < 5 and (get_epicsPV(mot1.dmov) == 0)) :
print 'suspect hanging of motor 1 monitor'
enc_1_a= get_epicsPV(mot1.rep)
time.sleep(0.25)
enc_1_b= get_epicsPV(mot1.rep)
diff_1=enc_1_b-enc_1_a
time.sleep(0.5)
if ((abs(diff_1) < 3) and (get_epicsPV(mot1.dmov) == 0)):
print ' ===================================== diff',diff_1
print 'difference subsequent encoder steps too small'
print 'Motor 1 hanging???? STOP movement for safety '
put_epicsPV(mot_1.spmg,0,delay=0.1)
stop
# endif
#endif
# Now checlk motor 2
if (abs(diff_2) < 5 and (get_epicsPV(mot2.dmov) == 0)) :
print 'suspect hanging of motor 2: monitor'
enc_2_a= get_epicsPV(mot2.rep)
time.sleep(0.25)
enc_2_b= get_epicsPV(mot2.rep)
diff_2=enc_2_b-enc_2_a
time.sleep(0.5)
if ((abs(diff_2) < 3) and (get_epicsPV(mot2.dmov) == 0)):
print ' ===================================== diff',diff_2
print 'difference subsequent encoder steps too small'
print 'Motor 2 hanging???? STOP movement for safety '
put_epicsPV(mot2.spmg,0,delay=0.1)
stop
# endif
#endif
print 'Motors ',mot1.val, 'and',mot2.val,' still moving, wait '
print 'difference subsequent encoder positions ',diff_1, diff_2
# endwhile
# endif
print 'movement finished'
# =======================================================================
# Generate all names
class Motor():
#define extensions for all channels
e_val = '.VAL'
e_dval = '.DVAL'
e_rbv = '.RBV'
e_drbv = '.DRBV'
e_off ='.OFF'
e_twv = '.TWV'
e_twf = '.TWF'
e_twr = '.TWR'
e_jogr ='.JOGR' # jog rev
e_jogf ='.JOGF' # jog forw
e_jomf ='.HOMF' # home forward
e_jomr ='.HOMR' # home reverse
e_hlm ='.HLM' #High limit
e_llm ='.LLM' #lower limit
e_hls ='.HLS' #High limit
e_lls ='.LLS' #lower limit
e_dmov ='.DMOV' # moving done
e_movn ='.MOVN' # moving
e_spmg ='.SPMG' # stop pause move go (0,1,2,3)
e_ueip ='.UEIP' # encoder on / off
e_diff ='.DIFF'
e_rep = '.REP'
e_foff ='.FOFF' #FOFF (1 = Frozen / 0 = variable)
e_set = '.SET' # 0 = use 1 = set
e_mres = '.MRES'
e_eres = '.ERES'
def create_Ch(self,ch):
# this routine creates all channel names
self.val=ch+self.e_val
self.dval=ch+self.e_dval
self.rbv=ch+self.e_rbv
self.drbv=ch+self.e_drbv
self.off=ch+self.e_off
self.twv=ch+self.e_twv
self.twf=ch+self.e_twf
self.twr=ch+self.e_twr
self.jogr=ch+self.e_jogr
self.jogf=ch+self.e_jogf
self.hlm=ch+self.e_hlm
self.llm=ch+self.e_llm
self.hls=ch+self.e_hls
self.lls=ch+self.e_lls
self.dmov = ch+self.e_dmov
self.movn = ch+self.e_movn
self.spmg = ch+self.e_spmg
self.ueip = ch+self.e_ueip
self.rep = ch+self.e_rep
self.mres = ch+self.e_mres
self.eres = ch+self.e_eres
self.diff = ch+self.e_diff
self.foff = ch+self.e_foff
self.set = ch+self.e_set
# end create_Chnames
# endclass
def define_motor(ch):
g=Motor()
g.create_Ch(ch)
return g
# end define_motor
def define_motor_kb(ch):
g=Motor()
g.create_Ch(ch)
# set offset to variable as default setting
# this keeps the dval always constant
put_epicsPV(g.foff,1,delay=0.2) # offset to frozen as we do not initialize
put_epicsPV(g.set,0,delay=0.2) # make sure that coos are on use
return g
# end define_motor
def init_one_motor(init_motor,HHL=0,LHL=0,LIM='low',set_offset='variable'):
# initialization for givrn location of limit switch (1 motor)
motor=define_motor_kb(init_motor)
total_range = abs(HHL-LHL)
# general settings
if set_offset == 'frozen':
put_epicsPV(motor.foff,1,delay=0.2) # offset to frozen as we do not initialize
else:
put_epicsPV(motor.foff,0,delay=0.2) # offset to variable if needed
#endif
put_epicsPV(motor.twv,0.1,delay=0.2) # set tweak value to 100 mum as default
put_epicsPV(motor.ueip , 0 ,delay=.5) # switch encoder off
# first remember start value
x_ini=get_epicsPV(motor.val)
print x_ini
# now remove soft limits allow start value plus total range as maximum
put_epicsPV(motor.hlm ,x_ini+ total_range*1.15 ,delay=.5) # switch encoder off
put_epicsPV(motor.llm ,x_ini-total_range*1.15 ,delay=.5) # switch encoder off
# now move to lower limit
if LIM == 'low':
move_and_wait(motor ,x_ini-total_range*1.07,check_moving=1)
# now check whether we are in the limit
if get_epicsPV(motor.lls) == 0:
print(' hard limit not reached',motor.val)
stop
else:
print(' hard limit reached',motor.val)
#endelse
# now set coordinate system according to limits
# endcoder on / off twice to account for bug in motor record
put_epicsPV(motor.ueip ,1,delay=.5) # encoder on
put_epicsPV(motor.ueip ,0,delay=.5) # encoder on
put_epicsPV(motor.ueip ,1,delay=.5) # encoder on
# widen soft limits temporally around aimed position
# in case the correct value is outside the current soft limits
put_epicsPV(motor.llm , LHL-10000 , delay=1) # st lower soft limit according to table
put_epicsPV(motor.hlm , LHL+10000 , delay=1) # st higher soft limit according to table
put_epicsPV(motor.set ,1,delay=1) # now allow change of coo syste cal to set m
put_epicsPV(motor.val ,LHL,delay=1) # set current position to lower limit
put_epicsPV(motor.set ,0,delay=0.5) # set co system to use
# finally drive motor out of hard limits and set the correct soft limits
put_epicsPV(motor.twv,0.1,delay=1.) # set tweak value to 100 mum as default
put_epicsPV(motor.twf ,1,delay=2.) # tweak out of hard limit
put_epicsPV(motor.llm , LHL+0.05 , delay=0.5) # st lower soft limit according to table
put_epicsPV(motor.hlm , HHL-0.05 , delay=0.5) # st higher soft limit according to table
#endif
if LIM == 'high':
move_and_wait(motor ,x_ini+total_range*1.07,check_moving=1) # move to high limits
# now check whether we are in the limit
if get_epicsPV(motor.hls) == 0:
print(' hard limit not reached',motor.val)
stop
else:
print(' hard limit reached',motor.val)
#endelse
# now set coordinate system according to limits
put_epicsPV(motor.ueip ,1,delay=.5) # encoder on
# widen soft limits temporally around aimed position
# in case the correct value is outside the current soft limits
put_epicsPV(motor.llm , LHL-10000 , delay=1) # st lower soft limit according to table
put_epicsPV(motor.hlm , LHL+10000 , delay=1) # st higher soft limit according to table
put_epicsPV(motor.set ,1,delay=0.5) # now allow change of coo syste cal to set m
put_epicsPV(motor.val ,HHL,delay=0.5) # set current position to lower limit
put_epicsPV(motor.set ,0,delay=0.5) # set coo system to use
# finally set the soft limits
put_epicsPV(motor.twv,0.1,delay=0.5) # set tweak value to 100 mum as default
put_epicsPV(motor.twr ,1,delay=2.) # tweak out of hard limit to lower values
put_epicsPV(motor.llm , LHL+0.05 , delay=0.2) # st lower soft limit according to table
put_epicsPV(motor.hlm , HHL-0.05 , delay=0.2) # st higher soft limit according to table
#endif
#end
def init_two_motors(mot1=' ',mot2=' ',HHL_1=0,LHL_1=0,HHL_2=0,LHL_2=0,LIM='low',set_offset='variable'):
# initialization for givrn location of limit switch
# initializes 2 motors with simultaneous movemnets into the same direction
#
motor_1=define_motor_kb(mot1)
motor_2=define_motor_kb(mot2)
total_range_1 = abs(HHL_1-LHL_1)
total_range_2 = abs(HHL_2-LHL_2)
# general settings
if set_offset == 'frozen':
put_epicsPV(motor_1.foff,1,delay=0.2) # offset to frozen as we do not initialize
put_epicsPV(motor_2.foff,1,delay=0.2) # offset to frozen as we do not initialize
else:
put_epicsPV(motor_1.foff,0,delay=0.2) # offset to variable if needed
put_epicsPV(motor_2.foff,0,delay=0.2) # offset to variable if needed
#endif
put_epicsPV(motor_1.twv,0.1,delay=0.2) # set tweak value to 100 mum as default
put_epicsPV(motor_2.twv,0.1,delay=0.2) # set tweak value to 100 mum as default
put_epicsPV(motor_1.ueip , 0 ,delay=.5) # switch encoder off
put_epicsPV(motor_2.ueip , 0 ,delay=.5) # switch encoder off
# first remember start values
x_ini_1=get_epicsPV(motor_1.val)
x_ini_2=get_epicsPV(motor_2.val)
print x_ini_1,x_ini_2
# now remove soft limits allow start value plus total range as maximum
put_epicsPV(motor_1.hlm ,x_ini_1 + total_range_1*1.15 ,delay=.5) # switch encoder off
put_epicsPV(motor_1.llm ,x_ini_1 - total_range_1*1.15 ,delay=.5) # switch encoder off
put_epicsPV(motor_2.hlm ,x_ini_2 + total_range_2*1.15 ,delay=.5) # switch encoder off
put_epicsPV(motor_2.llm ,x_ini_2 - total_range_2*1.15 ,delay=.5) # switch encoder off
# now move to lower limit
if LIM == 'low':
move_and_wait_2motors(motor_1,motor_2 ,x_ini_1-total_range_1*1.07,x_ini_2-total_range_2*1.07,check_moving=1)
# now check whether we are in the limit
if ((get_epicsPV(motor_1.lls) == 0 ) or (get_epicsPV(motor_2.lls) == 0 )) :
print(' hard limit not reached',motor_1.val)
print(' hard limit not reached',motor_2.val)
stop
else:
print(' hard limit reached',motor_1.val)
print(' hard limit reached',motor_2.val)
#endelse
# Now both motors are in the hard limit
# now set coordinate system according to limits
# endcoder on / off twice to account for bug in motor record
put_epicsPV(motor_1.ueip ,1,delay=.5) # encoder on
put_epicsPV(motor_1.ueip ,0,delay=.5) # encoder on
put_epicsPV(motor_1.ueip ,1,delay=.5) # encoder on
put_epicsPV(motor_2.ueip ,1,delay=.5) # encoder on
put_epicsPV(motor_2.ueip ,0,delay=.5) # encoder on
put_epicsPV(motor_2.ueip ,1,delay=.5) # encoder on
# widen soft limits temporally around aimed position
# in case the correct value is outside the current soft limits
put_epicsPV(motor_1.llm , LHL_1-10000 , delay=1) # st lower soft limit according to table
put_epicsPV(motor_1.hlm , LHL_1+10000 , delay=1) # st higher soft limit according to table
put_epicsPV(motor_2.llm , LHL_2-10000 , delay=1) # st lower soft limit according to table
put_epicsPV(motor_2.hlm , LHL_2+10000 , delay=1) # st higher soft limit according to table
put_epicsPV(motor_1.set ,1,delay=1) # now allow change of coo syste cal to set m
put_epicsPV(motor_2.set ,1,delay=1) # now allow change of coo syste cal to set m
put_epicsPV(motor_1.val ,LHL_1,delay=1) # set current position to lower limit
put_epicsPV(motor_2.val ,LHL_2,delay=1) # set current position to lower limit
put_epicsPV(motor_1.set ,0,delay=0.5) # set co system to use
put_epicsPV(motor_2.set ,0,delay=0.5) # set co system to use
# finally drive motor out of hard limits and set the correct soft limits
put_epicsPV(motor_1.twv,0.1,delay=1.) # set tweak value to 100 mum as default
put_epicsPV(motor_1.twf ,1,delay=2.) # tweak out of hard limit
put_epicsPV(motor_1.llm , LHL_1+0.05 , delay=0.5) # st lower soft limit according to table
put_epicsPV(motor_1.hlm , HHL_1-0.05 , delay=0.5) # st higher soft limit according to table
put_epicsPV(motor_2.twv,0.1,delay=1.) # set tweak value to 100 mum as default
put_epicsPV(motor_2.twf ,1,delay=2.) # tweak out of hard limit
put_epicsPV(motor_2.llm , LHL_2+0.05 , delay=0.5) # st lower soft limit according to table
put_epicsPV(motor_2.hlm , HHL_2-0.05 , delay=0.5) # st higher soft limit according to table
#endif
if LIM == 'high':
print ' not implemented yet'
stop
move_and_wait(motor ,x_ini+total_range*1.07,check_moving=1) # move to high limits
# now check whether we are in the limit
if get_epicsPV(motor.hls) == 0:
print(' hard limit not reached',motor.val)
stop
else:
print(' hard limit reached',motor.val)
#endelse
# now set coordinate system according to limits
put_epicsPV(motor.ueip ,1,delay=.5) # encoder on
# widen soft limits temporally around aimed position
# in case the correct value is outside the current soft limits
put_epicsPV(motor.llm , LHL-10000 , delay=1) # st lower soft limit according to table
put_epicsPV(motor.hlm , LHL+10000 , delay=1) # st higher soft limit according to table
put_epicsPV(motor.set ,1,delay=0.5) # now allow change of coo syste cal to set m
put_epicsPV(motor.val ,HHL,delay=0.5) # set current position to lower limit
put_epicsPV(motor.set ,0,delay=0.5) # set coo system to use
# finally set the soft limits
put_epicsPV(motor.twv,0.1,delay=0.5) # set tweak value to 100 mum as default
put_epicsPV(motor.twr ,1,delay=2.) # tweak out of hard limit to lower values
put_epicsPV(motor.llm , LHL+0.05 , delay=0.2) # st lower soft limit according to table
put_epicsPV(motor.hlm , HHL-0.05 , delay=0.2) # st higher soft limit according to table
#endif
#end
def Next_Point():
sp1=np.zeros(2048)
sp2=np.zeros(2048)
sp3=np.zeros(2048)
sp4=np.zeros(2048)
d1 = 0
d2 = 0
d3 = 0
d4 = 0
#print('erasestart')
put_epicsPV('X07MB-XMAP:EraseStart',1,delay=.03)
#print('erasestart')
#time.sleep(.1)
put_epicsPV('X07MB-OP2:SMPL',1,delay=.03)
Wait_for_Value("X07MB-OP2:SMPL-DONE",1)
put_epicsPV('X07MB-XMAP:StopAll',1,delay=.03)
sp1_old=sp1
sp2_old=sp2
sp3_old=sp3
sp4_old=sp4
d1_old=d1
d2_old=d2
d3_old=d3
d4_old=d4
sp1=get_epicsPV('X07MB-XMAP:mca1.VAL')
sp2=get_epicsPV('X07MB-XMAP:mca2.VAL')
sp3=get_epicsPV('X07MB-XMAP:mca3.VAL')
sp4=get_epicsPV('X07MB-XMAP:mca4.VAL')
d1=get_epicsPV('X07MB-XMAP:mca1.DTIM')
d2=get_epicsPV('X07MB-XMAP:mca2.DTIM')
d3=get_epicsPV('X07MB-XMAP:mca3.DTIM')
d4=get_epicsPV('X07MB-XMAP:mca4.DTIM')
print('differences',max(sp1-sp1_old),max(sp2-sp2_old),max(sp3-sp3_old),max(sp4-sp4_old))
print('differences',d1-d1_old,d2-d2_old,d3-d3_old,d4-d4_old)
time.sleep(.5)
# end
init_PV()
N=10000000
for i in range(N):
Next_Point()
#endwhile

View File

@ -0,0 +1,33 @@
import time
import MyLib as ML
channels=['X07MB-MA:ScanX.RBV']
import epics as ep
def init_PV(channels):
PV = []
for i in range(len(channels)):
print('init',i,)
PV.append(ep.camonitor(channels[i]))
#endfor
return PV
#end
def read_all(channels):
res=[]
for i in range(len(channels)):
print('read',i)
res.append(PV[i].get())
#endfor
#end
PV=init_PV(channels)
n_ch=len(channels)
print(PV)
asdasd
for i in range(n_ch):
print(i)
w=read_all(channels)
print(w)
#endfor

View File

@ -0,0 +1 @@
camon X07MB-ES-MA1:ScanX.RBV

View File

@ -0,0 +1 @@
camon X07MB-ES1-MA1:ScanX.RBV

View File

@ -36,7 +36,7 @@ import sys
phoenix.create_base_config()
phoenix.add_xmap()
dev.MA1_ScanX.enabled = True
dev.xmap.enabled=True
print("---------------------------------")
# scan will not diode