Files
x09la/script/local.py
gac-x09la f6b0954e8f
2022-02-07 09:49:39 +01:00

404 lines
14 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
def get_additional_positioners():
ret = []
try:
for dev in [exit_slit, fe_vert_width, fe_horiz_width, cff, energy]:
ret.append(dev)
except:
log("Error getting additional positioner: " + str(sys.exc_info()[1]))
return ret
###################################################################################################
# Device initialization
###################################################################################################
class Energy(PositionerBase):
def __init__(self, name, config):
ControlledVariableBase.__init__(self, name, photon_energy.config)
self.setReadback(photon_energy.getReadback())
def take(self):
return photon_energy.take()
def doRead(self):
return photon_energy.read()
def doWrite(self, val):
if not photon_energy.isInPosition(val):
change_photon_pars(_photon_energy=val)
def getMinValue(self):
er=get_energy_range()
return er[0] if er is not None else sys.maxint
def getMaxValue(self):
er=get_energy_range()
return er[1] if er is not None else -sys.maxint
add_device(Energy("energy", None), True)
def get_energy_range():
md,gr = id_mode.read(), grating.read()
if gr.startswith("G1"):
if md in ["CIRC+", "CIRC-", "LH"]:
return (20,360)
if md in ["LV"]:
return (200,360)
if md in ["LV low E"]:
return (40,120)
if md in ["LHQ"]:
return (20,200)
if md in ["OFF"]:
return (20,50)
elif gr.startswith("G2"):
if md in ["CIRC+", "CIRC-", "LH"]:
return (80,800)
if md in ["LV"]:
return (200,800)
if md in ["LV low E"]:
return (80,120)
if md in ["LHQ"]:
return (80,200)
elif gr.startswith("NIM"):
if md in ["CIRC+", "CIRC-"]:
return (20,30)
if md in ["LH", "LHQ"]:
return (10,30)
return None
class Cff(PositionerBase):
def __init__(self, name, config):
ControlledVariableBase.__init__(self, name, pgm_cff.config)
self.setReadback(pgm_cff.getReadback())
def doRead(self):
return pgm_cff.read()
def take(self):
return pgm_cff.take()
def doWrite(self, val):
if not pgm_cff.isInPosition(val):
change_photon_pars(_cff=val)
def getMinValue(self):
return pgm_cff.getMinValue()
def getMaxValue(self):
return pgm_cff.getMaxValue()
add_device(Cff("cff", None), True)
class FeSettlingCondition(SettlingCondition):
def doWait(self):
time.sleep(0.5)
cawait("X09LA-FE-SV1:TR1.DMOV", 1)
cawait("X09LA-FE-SV1:TR2.DMOV", 1)
cawait("X09LA-FE-SH1:TR1.DMOV", 1)
cawait("X09LA-FE-SH1:TR2.DMOV", 1)
class IdSettlingCondition(SettlingCondition):
def doWait(self):
time.sleep(0.5)
id_status.waitValue("done", -1)
class GrSettlingCondition(SettlingCondition):
def doWait(self):
time.sleep(0.5)
cawait("X09LA:m4.DMOV", 1)
class CffSettlingCondition(SettlingCondition):
def doWait(self):
caput("X09LA-PGM:setE.PROC",1)
time.sleep(0.5)
cawait("X09LA:m1.DMOV", 1)
cawait("X09LA:m2.DMOV", 1)
#fe_state.getSetpoint().setBlockingWrite(True)
fe_state.setSettlingCondition(FeSettlingCondition())
oper_mode.getSetpoint().setBlockingWrite(True)
photon_energy.setSettlingCondition(IdSettlingCondition())
id_mode.setSettlingCondition(IdSettlingCondition())
id_mode.setpoint.blockingWrite=True
grating.setSettlingCondition(GrSettlingCondition())
pgm_cff.setSettlingCondition(CffSettlingCondition())
def change_photon_pars(_photon_energy=None, _id_mode=None, _grating=None, _cff=None):
if (_photon_energy is not None) and (photon_energy.isInPosition(_photon_energy)):
_photon_energy = None
if _id_mode == id_mode.read():
_id_mode=None
if _grating == grating.read():
_grating=None
if _cff == pgm_cff.read():
_cff=None
if _photon_energy == _id_mode == _grating == _cff == None:
return
log("Changing photon parameters: photon_energy=" + str(_photon_energy) + " id_mode=" + str(_id_mode) + " grating=" + str(_grating) + " cff=" + str(_cff))
initial_fe_state = fe_state.read()
initial_oper_mode = oper_mode.read()
initial_v = fe_vert_width.read();
initial_h = fe_horiz_width.read()
try:
#Set front end to “Closed” state
fe_state.move("Closed")
#Set operation mode to “PGM”
oper_mode.move("PGM")
if _id_mode is not None:
#Set polarization mode
print "Setting id_mode="+str(_id_mode)
id_mode.move("OFF")
id_mode.move(_id_mode)
if _grating is not None:
#Set grating
print "Setting grating="+str(_grating)
grating.move(_grating)
if _photon_energy is not None:
#Set insertion device energy
print "Setting photon_energy="+str(_photon_energy)
photon_energy.move(_photon_energy)
if _cff is not None:
#Set cff
print "Setting cff="+str(_cff)
pgm_cff.move(_cff)
finally:
#Return operation mode to original value (e.g. “PGM+ID”)
oper_mode.move(initial_oper_mode)
#Return front end to original state (e.g. “2x2”)
fe_state.move(initial_fe_state)
#IF (initial_h, initial_v) does not correspond to a predefined setting
if (initial_h!=initial_v) or \
not initial_h in [-15.0,0.0,0.25,0.5,1.0,1.5,2.0]:
fe_vert_width.write(initial_v);
fe_horiz_width.write(initial_h);
fe_settling_condition.waitSettled()
class AcquisitionMode(Readable.ReadableString):
def read(self):
return str(scienta.getAcquisitionMode())
_acquisition_mode=AcquisitionMode()
class EnergyMode(Readable.ReadableString):
def read(self):
return str(scienta.getEnergyMode())
_energy_mode=EnergyMode()
class LensMode(Readable.ReadableString):
def read(self):
return str(scienta.getLensMode())
_lens_mode=LensMode()
class DetectorMode(Readable.ReadableString):
def read(self):
return str(scienta.getDetectorMode())
_detector_mode=DetectorMode()
class PassEnergy(Readable):
def read(self):
return scienta.getPassEnergy()
_pass_energy=PassEnergy()
class ElementSet(Readable.ReadableString):
def read(self):
return str(scienta.getElementSet())
_element_set=ElementSet()
###################################################################################################
# Handlig diagnostics
###################################################################################################
diag_channels = []
diag_channels.append(scienta.lowEnergy.readback)
diag_channels.append(scienta.centerEnergy.readback)
diag_channels.append(scienta.highEnergy.readback)
diag_channels.append(scienta.energyStepSize.readback)
diag_channels.append(scienta.energyWidth)
diag_channels.append(scienta.energyCount)
diag_channels.append(scienta.lowThetaY.readback)
diag_channels.append(scienta.centerThetaY.readback)
diag_channels.append(scienta.highThetaY.readback)
diag_channels.append(scienta.thetaYStepSize.readback)
diag_channels.append(scienta.thetaYWidth)
diag_channels.append(scienta.thetaYCount)
diag_channels.append(scienta.lowThetaX)
diag_channels.append(scienta.centerThetaX.readback)
diag_channels.append(scienta.highThetaX)
diag_channels.append(scienta.thetaXStepSize)
diag_channels.append(scienta.thetaXWidth)
diag_channels.append(scienta.thetaXCount)
diag_channels.append(scienta.slicesReadback)
diag_channels.append(scienta.channelsReadback)
diag_channels.append(scienta.excitationEnergy)
diag_channels.append(_acquisition_mode)
diag_channels.append(_energy_mode)
diag_channels.append(_lens_mode)
diag_channels.append(_detector_mode)
diag_channels.append(_pass_energy)
diag_channels.append(_element_set)
diag_channels = sorted(diag_channels, key=lambda channel: channel.name)
def get_diag_name(diag):
return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "")
def print_diag():
for f in diag_channels:
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
def create_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
create_dataset(group+get_diag_name(f) , 's' if (issubclass(type(f), Readable.ReadableString)) else 'd')
def append_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
try:
x = f.read()
if x is None:
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
append_dataset(group+get_diag_name(f), x)
except:
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
def handle_diagnostics(rec):
#if beam_ok:
if get_exec_pars().save:
#Saving only once the diag data
if rec.index == 0:
create_diag_datasets()
append_diag_datasets()
def trigger_scienta():
"""
Trigger new acquisition
"""
#scienta.start()
#scienta.waitNewImage(-1)
if scienta.isSimulated():
time.sleep(0.1)
else:
image_id = scienta.currentImageCount
scienta.start()
scienta.waitReady(-1)
scienta.waitNewImage(3000, image_id)
def dummy_trigger_scienta():
"""
Trigger detector to update the array sizes and calibration
"""
iterations = scienta.getIterations()
scienta.setIterations(1)
try:
trigger_scienta()
finally:
scienta.setIterations(iterations)
###################################################################################################
# Utilities
###################################################################################################
def fit(ydata, xdata = None):
"""
Gaussian fit
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
#ydata = to_list(ydata)
#xdata = to_list(xdata)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
print "Max index:" + str(index_max),
print " x:" + str(max_x),
print " y:" + str(max_y)
gaussians = fit_gaussians(ydata, xdata, [index_max,])
(norm, mean, sigma) = gaussians[0]
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
fitted_gaussian_function = Gaussian(norm, mean, sigma)
scale_x = [float(min(xdata)), float(max(xdata)) ]
points = max((len(xdata)+1), 100)
resolution = (scale_x[1]-scale_x[0]) / points
fit_y = []
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
for x in fit_x:
fit_y.append(fitted_gaussian_function.value(x))
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
print "Mean -> " + str(mean)
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
return (norm, mean, sigma)
else:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Invalid gaussian fit: " + str(mean)
return (None, None, None)
def calc_acquisition_time(samples=1,exp=None, iter=None, images=None, mode=None, enl=None, enh=None, ens=None, tyl=None, tyh=None, tys=None, as_string=True):
if exp is None: exp = scienta.getExposure()
if iter is None: iter = scienta.getIterations()
if images is None: images = scienta.getNumImages()
if mode is None: mode = str(scienta.getAcquisitionMode())
if enl is None: enl = scienta.getLowEnergy().take()
if enh is None: enh = scienta.getHighEnergy().take()
if ens is None: ens = scienta.getEnergyStepSize().take()
if tyl is None: tyl = scienta.getLowThetaY().take()
if tyh is None: tyh = scienta.getHighThetaY().take()
if tys is None: tys = scienta.getThetaYStepSize().take()
pass_energy=float(scienta.getPassEnergy())
time_s = float(exp)*iter*images
if mode in ("Swept_Energy", "Swept_Energy_ThetaY"):
time_s = time_s * ((abs(enh - enl)+(pass_energy*0.08)) / ens + 3.0)
if mode in ("Swept_ThetaY", "Swept_Energy_ThetaY"):
time_s = time_s * (abs(tyh - tyl) / tys + 3.0)
time_s = time_s * samples
time_s = time_s + 1
if not as_string:
return time_s
#return time.strftime("%H:%M:%S" , time.gmtime(time_s))
dec= math.modf(time_s)[0]
hours = time_s // (60*60)
time_s %= (60*60)
minutes = time_s // 60
time_s %= 60
#if (hours==minutes==0) and (time_s<10) :
if (hours==minutes==time_s==0):
ret= "%0.3f" % (time_s)
else:
ret= "%02i:%02i:%02i" % (hours, minutes, time_s)
return ret