################################################################################################### # Deployment specific global definitions - executed after startup.py ################################################################################################### import ch.psi.pshell.device.Camera as Camera def get_additional_positioners(): ret = [] try: for dev in [exit_slit, fe_vert_width, fe_horiz_width, cff, energy]: ret.append(dev) except: log("Error getting additional positioner: " + str(sys.exc_info()[1])) return ret ################################################################################################### # Device initialization ################################################################################################### class Energy(PositionerBase): def __init__(self, name, config): ControlledVariableBase.__init__(self, name, photon_energy.config) self.setReadback(photon_energy.getReadback()) def take(self): return photon_energy.take() def doRead(self): return photon_energy.read() def doWrite(self, val): if not photon_energy.isInPosition(val): change_photon_pars(_photon_energy=val) def getMinValue(self): er=get_energy_range() return er[0] if er is not None else sys.maxint def getMaxValue(self): er=get_energy_range() return er[1] if er is not None else -sys.maxint def getChannelName(self): return photon_energy.getChannelName() def getResolution(self): return photon_energy.getResolution() add_device(Energy("energy", None), True) def get_energy_range(): md,gr = id_mode.read(), grating.read() if gr.startswith("G1"): if md in ["CIRC+", "CIRC-", "LH"]: return (20,360) if md in ["LV"]: return (200,360) if md in ["LV low E"]: return (40,120) if md in ["LHQ"]: return (20,200) if md in ["OFF"]: return (20,50) elif gr.startswith("G2"): if md in ["CIRC+", "CIRC-", "LH"]: return (80,800) if md in ["LV"]: return (200,800) if md in ["LV low E"]: return (80,120) if md in ["LHQ"]: return (80,200) elif gr.startswith("NIM"): if md in ["CIRC+", "CIRC-"]: return (20,30) if md in ["LH", "LHQ"]: return (10,30) return None class Cff(PositionerBase): def __init__(self, name, config): ControlledVariableBase.__init__(self, name, pgm_cff.config) self.setReadback(pgm_cff.getReadback()) def doRead(self): return pgm_cff.read() def take(self): return pgm_cff.take() def doWrite(self, val): if not pgm_cff.isInPosition(val): change_photon_pars(_cff=val) def getMinValue(self): return pgm_cff.getMinValue() def getMaxValue(self): return pgm_cff.getMaxValue() def getChannelName(self): return pgm_cff.getChannelName() add_device(Cff("cff", None), True) class FeSettlingCondition(SettlingCondition): def doWait(self): time.sleep(0.5) cawait("X09LA-FE-SV1:TR1.DMOV", 1) cawait("X09LA-FE-SV1:TR2.DMOV", 1) cawait("X09LA-FE-SH1:TR1.DMOV", 1) cawait("X09LA-FE-SH1:TR2.DMOV", 1) class IdSettlingCondition(SettlingCondition): def doWait(self): time.sleep(0.5) id_status.waitValue("done", -1) class GrSettlingCondition(SettlingCondition): def doWait(self): time.sleep(0.5) cawait("X09LA:m4.DMOV", 1) class CffSettlingCondition(SettlingCondition): def doWait(self): caput("X09LA-PGM:setE.PROC",1) time.sleep(0.5) cawait("X09LA:m1.DMOV", 1) cawait("X09LA:m2.DMOV", 1) #fe_state.getSetpoint().setBlockingWrite(True) fe_state.setSettlingCondition(FeSettlingCondition()) oper_mode.getSetpoint().setBlockingWrite(True) photon_energy.setSettlingCondition(IdSettlingCondition()) id_mode.setSettlingCondition(IdSettlingCondition()) id_mode.setpoint.blockingWrite=True grating.setSettlingCondition(GrSettlingCondition()) pgm_cff.setSettlingCondition(CffSettlingCondition()) id_energy.setSettlingCondition(IdSettlingCondition()) id_energy.setpoint.blockingWrite=True def change_photon_pars(_photon_energy=None, _id_mode=None, _grating=None, _cff=None): if (_photon_energy is not None) and (photon_energy.isInPosition(_photon_energy)): _photon_energy = None if _id_mode == id_mode.read(): _id_mode=None if _grating == grating.read(): _grating=None if _cff == pgm_cff.read(): _cff=None if _photon_energy == _id_mode == _grating == _cff == None: return log("Changing photon parameters: photon_energy=" + str(_photon_energy) + " id_mode=" + str(_id_mode) + " grating=" + str(_grating) + " cff=" + str(_cff)) initial_fe_state = fe_state.read() initial_oper_mode = oper_mode.read() initial_v = fe_vert_width.read(); initial_h = fe_horiz_width.read() try: #Set front end to “Closed” state fe_state.move("Closed") #Set operation mode to “PGM” oper_mode.move("PGM") def move_grating(_grating, _photon_energy, _cff): if _grating is not None: #Set grating print "Setting grating="+str(_grating) grating.move(_grating) if _photon_energy is not None: #Set moni energy print "Setting photon_energy="+str(_photon_energy) photon_energy.move(_photon_energy) if _cff is not None: #Set cff print "Setting cff="+str(_cff) pgm_cff.move(_cff) def move_id(_id_mode, _photon_energy): if _id_mode is not None: #Set polarization mode print "Setting id_mode="+str(_id_mode) id_mode.move("OFF") id_mode.move(_id_mode) if _photon_energy is not None: #Set insertion device energy print "Setting id_energy="+str(_photon_energy) id_energy.move(_photon_energy) ret = parallelize((move_grating,(_grating, _photon_energy, _cff)), (move_id,(_id_mode, _photon_energy))) finally: #Return operation mode to original value (e.g. “PGM+ID”) oper_mode.move(initial_oper_mode) #Return front end to original state (e.g. “2x2”) fe_state.move(initial_fe_state) #IF (initial_h, initial_v) does not correspond to a predefined setting if (initial_h!=initial_v) or \ not initial_h in [-15.0,0.0,0.25,0.5,1.0,1.5,2.0]: fe_vert_width.write(initial_v); fe_horiz_width.write(initial_h); fe_settling_condition.waitSettled() class AcquisitionMode(Readable.ReadableString): def read(self): return str(scienta.getAcquisitionMode()) _acquisition_mode=AcquisitionMode() class EnergyMode(Readable.ReadableString): def read(self): return str(scienta.getEnergyMode()) _energy_mode=EnergyMode() class LensMode(Readable.ReadableString): def read(self): return str(scienta.getLensMode()) _lens_mode=LensMode() class DetectorMode(Readable.ReadableString): def read(self): return str(scienta.getDetectorMode()) _detector_mode=DetectorMode() class PassEnergy(Readable): def read(self): return scienta.getPassEnergy() _pass_energy=PassEnergy() class ElementSet(Readable.ReadableString): def read(self): return str(scienta.getElementSet()) _element_set=ElementSet() ################################################################################################### # Handlig diagnostics ################################################################################################### diag_channels = [] diag_channels.append(scienta.lowEnergy.readback) diag_channels.append(scienta.centerEnergy.readback) diag_channels.append(scienta.highEnergy.readback) diag_channels.append(scienta.energyStepSize.readback) diag_channels.append(scienta.energyWidth) diag_channels.append(scienta.energyCount) diag_channels.append(scienta.lowThetaY.readback) diag_channels.append(scienta.centerThetaY.readback) diag_channels.append(scienta.highThetaY.readback) diag_channels.append(scienta.thetaYStepSize.readback) diag_channels.append(scienta.thetaYWidth) diag_channels.append(scienta.thetaYCount) diag_channels.append(scienta.lowThetaX) diag_channels.append(scienta.centerThetaX.readback) diag_channels.append(scienta.highThetaX) diag_channels.append(scienta.thetaXStepSize) diag_channels.append(scienta.thetaXWidth) diag_channels.append(scienta.thetaXCount) diag_channels.append(scienta.slicesReadback) diag_channels.append(scienta.channelsReadback) diag_channels.append(scienta.excitationEnergy) diag_channels.append(_acquisition_mode) diag_channels.append(_energy_mode) diag_channels.append(_lens_mode) diag_channels.append(_detector_mode) diag_channels.append(_pass_energy) diag_channels.append(_element_set) diag_channels = sorted(diag_channels, key=lambda channel: channel.name) def get_diag_name(diag): return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "") def print_diag(): for f in diag_channels: print "%-25s %s" % (get_diag_name(f) , str(f.read())) def create_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "attrs/" for f in diag_channels: create_dataset(group+get_diag_name(f) , 's' if (issubclass(type(f), Readable.ReadableString)) else 'd') def append_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "attrs/" for f in diag_channels: try: x = f.read() if x is None: x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan') append_dataset(group+get_diag_name(f), x) except: log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1])) def handle_diagnostics(rec): #if beam_ok: if get_exec_pars().save: #Saving only once the diag data if rec.index == 0: create_diag_datasets() append_diag_datasets() def trigger_scienta(): """ Trigger new acquisition """ #scienta.start() #scienta.waitNewImage(-1) if scienta.isSimulated(): time.sleep(0.1) else: image_id = scienta.currentImageCount scienta.start() scienta.waitReady(-1) scienta.waitNewImage(10000, image_id) def dummy_trigger_scienta(): """ Trigger detector to update the array sizes and calibration """ iterations = scienta.getIterations() scienta.setIterations(1) try: trigger_scienta() finally: scienta.setIterations(iterations) ################################################################################################### # Utilities ################################################################################################### def fit(ydata, xdata = None): """ Gaussian fit """ if xdata is None: xdata = frange(0, len(ydata), 1) #ydata = to_list(ydata) #xdata = to_list(xdata) max_y= max(ydata) index_max = ydata.index(max_y) max_x= xdata[index_max] print "Max index:" + str(index_max), print " x:" + str(max_x), print " y:" + str(max_y) gaussians = fit_gaussians(ydata, xdata, [index_max,]) (norm, mean, sigma) = gaussians[0] p = plot([ydata],["data"],[xdata], title="Fit" )[0] fitted_gaussian_function = Gaussian(norm, mean, sigma) scale_x = [float(min(xdata)), float(max(xdata)) ] points = max((len(xdata)+1), 100) resolution = (scale_x[1]-scale_x[0]) / points fit_y = [] fit_x = frange(scale_x[0],scale_x[1],resolution, True) for x in fit_x: fit_y.append(fitted_gaussian_function.value(x)) p.addSeries(LinePlotSeries("fit")) p.getSeries(1).setData(fit_x, fit_y) if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2): print "Mean -> " + str(mean) p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker()) return (norm, mean, sigma) else: p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY) print "Invalid gaussian fit: " + str(mean) return (None, None, None) def calc_acquisition_time(samples=1,exp=None, iter=None, images=None, mode=None, enl=None, enh=None, ens=None, tyl=None, tyh=None, tys=None, as_string=True): if exp is None: exp = scienta.getExposure() if iter is None: iter = scienta.getIterations() if images is None: images = scienta.getNumImages() if mode is None: mode = str(scienta.getAcquisitionMode()) if enl is None: enl = scienta.getLowEnergy().take() if enh is None: enh = scienta.getHighEnergy().take() if ens is None: ens = scienta.getEnergyStepSize().take() if tyl is None: tyl = scienta.getLowThetaY().take() if tyh is None: tyh = scienta.getHighThetaY().take() if tys is None: tys = scienta.getThetaYStepSize().take() pass_energy=float(scienta.getPassEnergy()) time_s = float(exp)*iter*images if mode in ("Swept_Energy", "Swept_Energy_ThetaY"): time_s = time_s * ((abs(enh - enl)+(pass_energy*0.08)) / ens + 3.0) if mode in ("Swept_ThetaY", "Swept_Energy_ThetaY"): time_s = time_s * (abs(tyh - tyl) / tys + 3.0) time_s = time_s * samples time_s = time_s + 1 if not as_string: return time_s #return time.strftime("%H:%M:%S" , time.gmtime(time_s)) dec= math.modf(time_s)[0] hours = time_s // (60*60) time_s %= (60*60) minutes = time_s // 60 time_s %= 60 #if (hours==minutes==0) and (time_s<10) : if (hours==minutes==time_s==0): ret= "%0.3f" % (time_s) else: ret= "%02i:%02i:%02i" % (hours, minutes, time_s) return ret def get_device_channel(dev): dev = string_to_obj(dev) if "getChannelName" in dir(dev): return dev.getChannelName() return None def set_device_channel_names(scan, sensors=None, snaps=None, diags=None, monitors=None): layout=get_context().dataManager.layout if sensors is not None: for dev in sensors: channel=get_device_channel(dev) if channel: try: set_attribute(layout.getScanPath(scan)+string_to_obj(dev).alias, "channel", channel) except: pass if snaps is not None: for dev in snaps: channel=get_device_channel(dev) if channel: try: set_attribute(layout.getSnapPathName(scan, dev), "channel", channel) except: pass if diags is not None: for dev in diags: channel=get_device_channel(dev) if channel: try: set_attribute(layout.getDiagPathName(scan, dev), "channel", channel) except: pass if monitors is not None: for dev in monitors: channel=get_device_channel(dev) if channel: try: set_attribute(layout.getMonitorPathName(scan, string_to_obj(dev)), "channel", channel) except: pass