343 lines
11 KiB
Python
343 lines
11 KiB
Python
###################################################################################################
|
|
# Deployment specific global definitions - executed after startup.py
|
|
###################################################################################################
|
|
|
|
|
|
|
|
###################################################################################################
|
|
# Device initialization
|
|
###################################################################################################
|
|
class Energy(PositionerBase):
|
|
def __init__(self, name, config):
|
|
ControlledVariableBase.__init__(self, name, photon_energy.config)
|
|
self.setReadback(photon_energy.getReadback())
|
|
|
|
def doRead(self):
|
|
return photon_energy.read()
|
|
|
|
def doWrite(self, val):
|
|
if not photon_energy.isInPosition(val):
|
|
change_photon_pars(_photon_energy=val)
|
|
|
|
def getMinValue(self):
|
|
er=get_energy_range()
|
|
return er[0] if er is not None else sys.maxint
|
|
|
|
def getMaxValue(self):
|
|
er=get_energy_range()
|
|
return er[1] if er is not None else -sys.maxint
|
|
add_device(Energy("energy", None), True)
|
|
|
|
def get_energy_range():
|
|
md,gr = id_mode.read(), grating.read()
|
|
if gr.startswith("G1"):
|
|
if md in ["CIRC+", "CIRC-", "LH"]:
|
|
return (20,360)
|
|
if md in ["LV"]:
|
|
return (200,360)
|
|
if md in ["LV low E"]:
|
|
return (40,120)
|
|
if md in ["LHQ"]:
|
|
return (20,200)
|
|
if md in ["OFF"]:
|
|
return (20,50)
|
|
elif gr.startswith("G2"):
|
|
if md in ["CIRC+", "CIRC-", "LH"]:
|
|
return (80,800)
|
|
if md in ["LV"]:
|
|
return (200,800)
|
|
if md in ["LV low E"]:
|
|
return (80,120)
|
|
if md in ["LHQ"]:
|
|
return (80,200)
|
|
elif gr.startswith("NIM"):
|
|
if md in ["CIRC+", "CIRC-"]:
|
|
return (20,30)
|
|
if md in ["LH", "LHQ"]:
|
|
return (10,30)
|
|
return None
|
|
|
|
|
|
class Cff(PositionerBase):
|
|
def __init__(self, name, config):
|
|
ControlledVariableBase.__init__(self, name, pgm_cff.config)
|
|
self.setReadback(pgm_cff.getReadback())
|
|
|
|
def doRead(self):
|
|
return pgm_cff.read()
|
|
|
|
def doWrite(self, val):
|
|
if not pgm_cff.isInPosition(val):
|
|
change_photon_pars(_cff=val)
|
|
|
|
def getMinValue(self):
|
|
return pgm_cff.getMinValue()
|
|
|
|
def getMaxValue(self):
|
|
return pgm_cff.getMaxValue()
|
|
|
|
add_device(Cff("cff", None), True)
|
|
|
|
class FeSettlingCondition(SettlingCondition):
|
|
def doWait(self):
|
|
time.sleep(0.5)
|
|
cawait("X09LA-FE-SV1:TR1.DMOV", 1)
|
|
cawait("X09LA-FE-SV1:TR2.DMOV", 1)
|
|
cawait("X09LA-FE-SH1:TR1.DMOV", 1)
|
|
cawait("X09LA-FE-SH1:TR2.DMOV", 1)
|
|
|
|
class IdSettlingCondition(SettlingCondition):
|
|
def doWait(self):
|
|
time.sleep(0.5)
|
|
id_status.waitValue("done", -1)
|
|
|
|
class GrSettlingCondition(SettlingCondition):
|
|
def doWait(self):
|
|
time.sleep(0.5)
|
|
cawait("X09LA:m4.DMOV", 1)
|
|
|
|
class CffSettlingCondition(SettlingCondition):
|
|
def doWait(self):
|
|
caput("X09LA-PGM:setE.PROC",1)
|
|
time.sleep(0.5)
|
|
cawait("X09LA:m1.DMOV", 1)
|
|
cawait("X09LA:m2.DMOV", 1)
|
|
|
|
|
|
#fe_state.getSetpoint().setBlockingWrite(True)
|
|
fe_state.setSettlingCondition(FeSettlingCondition())
|
|
oper_mode.getSetpoint().setBlockingWrite(True)
|
|
photon_energy.setSettlingCondition(IdSettlingCondition())
|
|
id_mode.setSettlingCondition(IdSettlingCondition())
|
|
grating.setSettlingCondition(GrSettlingCondition())
|
|
pgm_cff.setSettlingCondition(CffSettlingCondition())
|
|
|
|
|
|
def change_photon_pars(_photon_energy=None, _id_mode=None, _grating=None, _cff=None):
|
|
if (_photon_energy is not None) and (photon_energy.isInPosition(_photon_energy)):
|
|
_photon_energy = None
|
|
if _id_mode == id_mode.read():
|
|
_id_mode=None
|
|
if _grating == grating.read():
|
|
_grating=None
|
|
if _cff == pgm_cff.read():
|
|
_cff=None
|
|
if _photon_energy == _id_mode == _grating == _cff == None:
|
|
return
|
|
log("Changing photon parameters: photon_energy=" + str(_photon_energy) + " id_mode=" + str(_id_mode) + " grating=" + str(_grating) + " cff=" + str(_cff))
|
|
|
|
initial_fe_state = fe_state.read()
|
|
initial_oper_mode = oper_mode.read()
|
|
initial_v = fe_vert_width.read();
|
|
initial_h = fe_horiz_width.read()
|
|
|
|
try:
|
|
#Set front end to “Closed” state
|
|
fe_state.move("Closed")
|
|
|
|
#Set operation mode to “PGM”
|
|
oper_mode.move("PGM")
|
|
|
|
|
|
if _id_mode is not None:
|
|
#Set polarization mode
|
|
print "Setting id_mode="+str(_id_mode)
|
|
id_mode.move("OFF")
|
|
id_mode.move(_id_mode)
|
|
|
|
if _grating is not None:
|
|
#Set grating
|
|
print "Setting grating="+str(_grating)
|
|
grating.move(_grating)
|
|
|
|
if _photon_energy is not None:
|
|
#Set insertion device energy
|
|
print "Setting photon_energy="+str(_photon_energy)
|
|
photon_energy.move(_photon_energy)
|
|
|
|
if _cff is not None:
|
|
#Set cff
|
|
print "Setting cff="+str(_cff)
|
|
pgm_cff.move(_cff)
|
|
|
|
finally:
|
|
#Return operation mode to original value (e.g. “PGM+ID”)
|
|
oper_mode.move(initial_oper_mode)
|
|
|
|
#Return front end to original state (e.g. “2x2”)
|
|
fe_state.move(initial_fe_state)
|
|
#IF (initial_h, initial_v) does not correspond to a predefined setting
|
|
if (initial_h!=initial_v) or \
|
|
not initial_h in [-15.0,0.0,0.25,0.5,1.0,1.5,2.0]:
|
|
fe_vert_width.write(initial_v);
|
|
fe_horiz_width.write(initial_h);
|
|
fe_settling_condition.waitSettled()
|
|
|
|
class AcquisitionMode(Readable.ReadableString):
|
|
def read(self):
|
|
return str(scienta.getAcquisitionMode())
|
|
_acquisition_mode=AcquisitionMode()
|
|
|
|
class EnergyMode(Readable.ReadableString):
|
|
def read(self):
|
|
return str(scienta.getEnergyMode())
|
|
_energy_mode=EnergyMode()
|
|
|
|
class LensMode(Readable.ReadableString):
|
|
def read(self):
|
|
return str(scienta.getLensMode())
|
|
_lens_mode=LensMode()
|
|
|
|
class DetectorMode(Readable.ReadableString):
|
|
def read(self):
|
|
return str(scienta.getDetectorMode())
|
|
_detector_mode=DetectorMode()
|
|
|
|
class PassEnergy(Readable):
|
|
def read(self):
|
|
return scienta.getPassEnergy()
|
|
_pass_energy=PassEnergy()
|
|
|
|
class ElementSet(Readable.ReadableString):
|
|
def read(self):
|
|
return str(scienta.getElementSet())
|
|
_element_set=ElementSet()
|
|
|
|
|
|
###################################################################################################
|
|
# Handlig diagnostics
|
|
###################################################################################################
|
|
|
|
diag_channels = []
|
|
|
|
diag_channels.append(scienta.lowEnergy.readback)
|
|
diag_channels.append(scienta.centerEnergy.readback)
|
|
diag_channels.append(scienta.highEnergy.readback)
|
|
diag_channels.append(scienta.energyStepSize.readback)
|
|
diag_channels.append(scienta.energyWidth)
|
|
diag_channels.append(scienta.energyCount)
|
|
diag_channels.append(scienta.lowThetaY.readback)
|
|
diag_channels.append(scienta.centerThetaY.readback)
|
|
diag_channels.append(scienta.highThetaY.readback)
|
|
diag_channels.append(scienta.thetaYStepSize.readback)
|
|
diag_channels.append(scienta.thetaYWidth)
|
|
diag_channels.append(scienta.thetaYCount)
|
|
diag_channels.append(scienta.lowThetaX)
|
|
diag_channels.append(scienta.centerThetaX.readback)
|
|
diag_channels.append(scienta.highThetaX)
|
|
diag_channels.append(scienta.thetaXStepSize)
|
|
diag_channels.append(scienta.thetaXWidth)
|
|
diag_channels.append(scienta.thetaXCount)
|
|
diag_channels.append(scienta.slicesReadback)
|
|
diag_channels.append(scienta.channelsReadback)
|
|
diag_channels.append(scienta.excitationEnergy)
|
|
|
|
diag_channels.append(_acquisition_mode)
|
|
diag_channels.append(_energy_mode)
|
|
diag_channels.append(_lens_mode)
|
|
diag_channels.append(_detector_mode)
|
|
diag_channels.append(_pass_energy)
|
|
diag_channels.append(_element_set)
|
|
|
|
|
|
diag_channels = sorted(diag_channels, key=lambda channel: channel.name)
|
|
|
|
def get_diag_name(diag):
|
|
return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "")
|
|
|
|
def print_diag():
|
|
for f in diag_channels:
|
|
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
|
|
|
|
def create_diag_datasets(parent = None):
|
|
if parent is None:
|
|
parent = get_exec_pars().group
|
|
group = parent + "attrs/"
|
|
for f in diag_channels:
|
|
create_dataset(group+get_diag_name(f) , 's' if (issubclass(type(f), Readable.ReadableString)) else 'd')
|
|
|
|
def append_diag_datasets(parent = None):
|
|
if parent is None:
|
|
parent = get_exec_pars().group
|
|
group = parent + "attrs/"
|
|
for f in diag_channels:
|
|
try:
|
|
x = f.read()
|
|
if x is None:
|
|
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
|
|
append_dataset(group+get_diag_name(f), x)
|
|
except:
|
|
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
|
|
|
|
|
|
def handle_diagnostics(rec):
|
|
#if beam_ok:
|
|
if get_exec_pars().save:
|
|
#Saving only once the diag data
|
|
if rec.index == 0:
|
|
create_diag_datasets()
|
|
append_diag_datasets()
|
|
|
|
|
|
|
|
def trigger_scienta():
|
|
"""
|
|
Trigger new acquisition
|
|
"""
|
|
scienta.start()
|
|
scienta.waitNewImage(-1)
|
|
|
|
|
|
def dummy_trigger_scienta():
|
|
"""
|
|
Trigger detector to update the array sizes and calibration
|
|
"""
|
|
iterations = scienta.getIterations()
|
|
scienta.setIterations(1)
|
|
try:
|
|
trigger_scienta()
|
|
finally:
|
|
scienta.setIterations(iterations)
|
|
|
|
|
|
###################################################################################################
|
|
# Utilities
|
|
###################################################################################################
|
|
|
|
def fit(ydata, xdata = None):
|
|
"""
|
|
Gaussian fit
|
|
"""
|
|
if xdata is None:
|
|
xdata = frange(0, len(ydata), 1)
|
|
#ydata = to_list(ydata)
|
|
#xdata = to_list(xdata)
|
|
max_y= max(ydata)
|
|
index_max = ydata.index(max_y)
|
|
max_x= xdata[index_max]
|
|
print "Max index:" + str(index_max),
|
|
print " x:" + str(max_x),
|
|
print " y:" + str(max_y)
|
|
gaussians = fit_gaussians(ydata, xdata, [index_max,])
|
|
(norm, mean, sigma) = gaussians[0]
|
|
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
|
|
fitted_gaussian_function = Gaussian(norm, mean, sigma)
|
|
scale_x = [float(min(xdata)), float(max(xdata)) ]
|
|
points = max((len(xdata)+1), 100)
|
|
resolution = (scale_x[1]-scale_x[0]) / points
|
|
fit_y = []
|
|
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
|
|
for x in fit_x:
|
|
fit_y.append(fitted_gaussian_function.value(x))
|
|
p.addSeries(LinePlotSeries("fit"))
|
|
p.getSeries(1).setData(fit_x, fit_y)
|
|
|
|
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
|
|
print "Mean -> " + str(mean)
|
|
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
|
|
return (norm, mean, sigma)
|
|
else:
|
|
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
|
|
print "Invalid gaussian fit: " + str(mean)
|
|
return (None, None, None)
|