import random import ch.psi.pshell.device.Readable.ReadableArray as ReadableArray import ch.psi.pshell.device.Readable.ReadableCalibratedArray as ReadableCalibratedArray import ch.psi.pshell.device.ArrayCalibration as ArrayCalibration import ch.psi.utils.Str from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian import java.awt.Color as Color #Synchrronized Scienta counts for stat in Scienta.stats: add_device(stat, True) beam_ok = True class SimulatedOutput(Writable): def write(self, value): pass class SimulatedInput(Readable): def __init__(self): self.x = 0.0 def read(self): self.x = self.x + 0.2 noise = (random.random() - 0.5) / 20.0 return math.sin(self.x) + noise sout = SimulatedOutput() sinp = SimulatedInput() def integrate_image(vertical = True): data = Scienta.data.read() #Integrate and plot (width,height) = Scienta.getImageSize().tolist() integration = [] if vertical: for i in range(width): p=0.0 for j in range(height): p=p+data[j*width+i] integration.append(p) else: for j in range(height): p=0.0 for i in range(width): p=p+data[j*width+i] integration.append(p) return integration class ImageEnergyDistribution(ReadableCalibratedArray): def getSize(self): (width,height) = Scienta.getImageSize().tolist() return width def read(self): return to_array(integrate_image(),'d') def getCalibration(self): c=Scienta.readImageDescriptor().calibration return ArrayCalibration(c.scaleX, c.offsetX) EnergyDistribution = ImageEnergyDistribution() class ImageAngleDistribution(ReadableCalibratedArray): def getSize(self): (width,height) = Scienta.getImageSize().tolist() return height def read(self): return to_array(integrate_image(False),'d') def getCalibration(self): c=Scienta.readImageDescriptor().calibration return ArrayCalibration(c.scaleY, c.offsetY) AngleDistribution = ImageAngleDistribution() def trig_scienta(): if Scienta.isSimulated(): time.sleep(0.1) else: image_id = Scienta.currentImageCount Scienta.start() Scienta.waitReady(-1) Scienta.waitNewImage(3000, image_id) diag_channels = [] diag_channels.append(Scienta.channelBegin) #diag_channels.append(ChannelDouble("ChannelBegin", "X03DA-SCIENTA:cam1:CHANNEL_BEGIN_RBV")) diag_channels.append(Scienta.channelEnd) #diag_channels.append(ChannelDouble("ChannelEnd", "X03DA-SCIENTA:cam1:CHANNEL_END_RBV")) diag_channels.append(Scienta.sliceBegin) # diag_channels.append(ChannelDouble("SliceBegin", "X03DA-SCIENTA:cam1:SLICE_BEGIN_RBV")) diag_channels.append(Scienta.sliceEnd) #diag_channels.append(ChannelDouble("StepTime", "X03DA-SCIENTA:cam1:SLICE_END_RBV")) diag_channels.append(Scienta.numSlices) # diag_channels.append(ChannelDouble("NumSlices", "X03DA-SCIENTA:cam1:SLICES_RBV")) diag_channels.append(Scienta.frames) # diag_channels.append(ChannelDouble("NumFrames", "X03DA-SCIENTA:cam1:FRAMES")) diag_channels.append(Scienta.numChannels) #diag_channels.append(ChannelDouble("NumChannels", "X03DA-SCIENTA:cam1:NUM_CHANNELS_RBV")) diag_channels.append(Scienta.lowEnergy) #diag_channels.append(ChannelDouble("LowEnergy", "X03DA-SCIENTA:cam1:LOW_ENERGY_RBV")) diag_channels.append(Scienta.centerEnergy) #diag_channels.append(ChannelDouble("CenterEnergy", "X03DA-SCIENTA:cam1:CENTRE_ENERGY_RBV")) diag_channels.append(Scienta.highEnergy) #diag_channels.append(ChannelDouble("HighEnergy", "X03DA-SCIENTA:cam1:HIGH_ENERGY_RBV")) #TODO: These are not of Scienta device interface. Should be included? #diag_channels.append(ChannelDouble("AcquisitionModeNum", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV")) #diag_channels.append(ChannelDouble("EnergyModeNum", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV")) #diag_channels.append(ChannelDouble("LensModeNum", "X03DA-SCIENTA:cam1:LENS_MODE_RBV")) #diag_channels.append(ChannelDouble("DetectorModeNum", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV")) #diag_channels.append(ChannelDouble("PassEnergyNum", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV")) #diag_channels.append(ChannelDouble("ElementSetNum", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV")) diag_channels.append(AcquisitionMode) #diag_attrs.append(ChannelString("AcquisitionMode", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV")) diag_channels.append(EnergyMode) #diag_attrs.append(ChannelString("EnergyMode", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV")) diag_channels.append(LensMode) #diag_attrs.append(ChannelString("LensMode", "X03DA-SCIENTA:cam1:LENS_MODE_RBV")) diag_channels.append(DetectorMode) #diag_attrs.append(ChannelString("DetectorMode", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV")) diag_channels.append(PassEnergy) #diag_attrs.append(ChannelString("PassEnergy", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV")) diag_channels.append(ElementSet) #diag_attrs.append(ChannelString("ElementSet", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV")) diag_channels.append(ExcitationEnergy) #diag_channels.append(ChannelDouble("ExcitationEnergy", "X03DA-SCIENTA:cam1:EXCITATION_ENERGY_RBV")) diag_channels.append(StepSize) #diag_channels.append(ChannelDouble("StepSize", "X03DA-SCIENTA:cam1:STEP_SIZE_RBV")) diag_channels.append(NumIterations) #diag_channels.append(ChannelDouble("NumIterations", "X03DA-SCIENTA:cam1:NumExposures_RBV")) diag_channels.append(AnalyserSlit) #diag_attrs.append(ChannelString("ElemeAnalyserSlitntSet", "X03DA-SCIENTA:cam1:ANALYSER_SLIT_RBV")) #Manipulator Settings diag_channels.append(ManipulatorX) diag_channels.append(ManipulatorY) diag_channels.append(ManipulatorZ) diag_channels.append(ManipulatorTheta) diag_channels.append(ManipulatorTilt) diag_channels.append(ManipulatorPhi) # Beamline Settings diag_channels.append(MachineBumpXOffset) diag_channels.append(MachineBumpXAngle) diag_channels.append(MachineBumpYOffset) diag_channels.append(MachineBumpYAngle) diag_channels.append(DynamicBumpYOffset) diag_channels.append(DynamicBumpYAngle) diag_channels.append(FrontendVCenter) diag_channels.append(FrontendVSize) diag_channels.append(FrontendHCenter) diag_channels.append(FrontendHSize) diag_channels.append(MonoVCenter) diag_channels.append(MonoVSize) diag_channels.append(MonoBladeDown) diag_channels.append(MonoBladeUp) diag_channels.append(MonoHCenter) diag_channels.append(MonoHSize) diag_channels.append(MonoApertureMode) diag_channels.append(RefocusVCenter) diag_channels.append(RefocusVSize) diag_channels.append(RefocusHCenter) diag_channels.append(RefocusHSize) diag_channels.append(FocusYTrans) diag_channels.append(FocusZTrans) diag_channels.append(FocusXRot) diag_channels.append(FocusYRot) diag_channels.append(FocusZRot) diag_channels.append(RefocusYTrans) diag_channels.append(RefocusZTrans) diag_channels.append(RefocusXRot) diag_channels.append(RefocusYRot) diag_channels.append(RefocusZRot) diag_channels.append(MonoEnergy) diag_channels.append(MonoCff) diag_channels.append(MonoBeta) diag_channels.append(MonoTheta) diag_channels.append(ExitSlit) # Auxiliary Measurements diag_channels.append(MachineCurrent) diag_channels.append(FocusWaterTemp) diag_channels.append(SampleCurrent) diag_channels.append(RefCurrent) diag_channels.append(AuxCurrent) diag_channels.append(AuxVoltage) diag_channels.append(SampleCurrentGain) diag_channels.append(RefCurrentGain) diag_channels.append(AuxCurrentGain) diag_channels.append(SampleCurrentAveraging) diag_channels.append(RefCurrentAveraging) diag_channels.append(AuxCurrentAveraging) diag_channels.append(AuxVoltageAveraging) diag_channels.append(SampleCurrentSampling) diag_channels.append(RefCurrentSampling) diag_channels.append(AuxCurrentSampling) diag_channels.append(AuxVoltageSampling) diag_channels.append(ChamberPressure) diag_channels.append(BeamlinePressure) diag_channels.append(ManipulatorTempA) diag_channels.append(ManipulatorTempB) diag_channels.append(ManipulatorCoolFlow) diag_channels.append(ManipulatorCoolFlowSet) diag_channels.append(MonoGrating) def get_diag_name(diag): return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "") def print_diag(): for f in diag_channels: print "%-25s %s" % (get_diag_name(f) , str(f.read())) def create_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "attrs/" for f in diag_channels: create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd') def append_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "attrs/" for f in diag_channels: x = f.read() if x is None: x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan') append_dataset(group+get_diag_name(f), x) def wait_beam(): if not beam_ok: print "Waiting for beam..." while not beam_ok: time.sleep(0.1) print "Beam ok" def before_readout(): wait_beam() sample_scienta = False for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]: if dev in SENSORS: sample_scienta = True break for dev in [Scienta.spectrum,EnergyDistribution, AngleDistribution, Scienta.dataMatrix]: if dev in SENSORS: sample_scienta = True break if sample_scienta: trig_scienta() def after_readout(rec): if get_exec_pars().persist: if rec.index == 0: create_diag_datasets() append_diag_datasets() def after_scan(): """ Close shutter and turn off analyser """ caput("X03DA-PC:AFTER-SCAN.PROC", 1) def set_adc_averaging(): value = Scienta.getStepTime().read() * 10.0 #averaging count in 100ms SampleCurrentAveraging.write(value) RefCurrentAveraging.write(value) AuxCurrentAveraging.write(value) AuxVoltageAveraging.write(value) def adjust_sensors(): #Updating ranges from Scienta Scienta.update() global SENSORS if SENSORS is not None: # Move integration to end for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]: if dev in SENSORS: SENSORS=SENSORS+[SENSORS.pop(SENSORS.index(dev))] for dev in ["Counts"]: if dev in SENSORS: SENSORS=[SENSORS.pop(SENSORS.index(dev))] + SENSORS if "Scienta.dataMatrix" in SENSORS or Scienta.dataMatrix in SENSORS: print "Not ACC" set_exec_pars(accumulate = False) #Device aliases for data files set_device_alias(Scienta.dataMatrix, "ScientaImage") set_device_alias(Scienta.spectrum, "ScientaSpectrum") set_device_alias(Scienta.centerEnergy, get_diag_name(Scienta.centerEnergy)) set_device_alias(Scienta.lowEnergy, get_diag_name(Scienta.lowEnergy)) set_device_alias(Scienta.highEnergy, get_diag_name(Scienta.highEnergy)) #Additional device configuration ManipulatorPhi.trustedWrite = False def fit(ydata, xdata = None): """ """ if xdata is None: xdata = frange(0, len(ydata), 1) max_y= max(ydata) index_max = ydata.index(max_y) max_x= xdata[index_max] print "Max index:" + str(index_max), print " x:" + str(max_x), print " y:" + str(max_y) gaussians = fit_gaussians(ydata, xdata, [index_max,]) (norm, mean, sigma) = gaussians[0] p = plot([ydata],["data"],[xdata], title="Fit" )[0] fitted_gaussian_function = Gaussian(norm, mean, sigma) scale_x = [float(min(xdata)), float(max(xdata)) ] points = max((len(xdata)+1), 100) resolution = (scale_x[1]-scale_x[0]) / points fit_y = [] fit_x = frange(scale_x[0],scale_x[1],resolution, True) for x in fit_x: fit_y.append(fitted_gaussian_function.value(x)) p.addSeries(LinePlotSeries("fit")) p.getSeries(1).setData(fit_x, fit_y) if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2): print "Mean -> " + str(mean) p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker()) return (norm, mean, sigma) else: p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY) print "Invalid gaussian fit: " + str(mean) return (None, None, None) def elog(title, message, attachments = [], author = None, category = "Info", domain = "", logbook = "Experiments", encoding=1): """ Add entry to ELOG. """ if author is None: author = "pshell" #get_context().getUser().name typ = "pshell" entry = "" cmd = 'G_CS_ELOG_add -l "' + logbook+ '" ' cmd = cmd + '-a "Author=' + author + '" ' cmd = cmd + '-a "Type=' + typ + '" ' cmd = cmd + '-a "Entry=' + entry + '" ' cmd = cmd + '-a "Title=' + title + '" ' cmd = cmd + '-a "Category=' + category + '" ' cmd = cmd + '-a "Domain=' + domain + '" ' for attachment in attachments: cmd = cmd + '-f "' + attachment + '" ' cmd = cmd + '-n ' + str(encoding) cmd = cmd + ' "' + message + '"' #print cmd #os.system (cmd) #print os.popen(cmd).read() import subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) (out, err) = proc.communicate() if (err is not None) and err!="": raise Exception(err) print out def get_plot_snapshots(title = None, file_type = "jpg", temp_path = get_context().setup.getContextPath()): """ Returns list with file names of plots snapshots from a plotting context. """ sleep(0.02) #Give some time to plot to be finished - it is not sync with acquisition ret = [] for p in get_plots(title): file_name = os.path.abspath(temp_path + "/" + p.getTitle() + "." + file_type) p.saveSnapshot(file_name , file_type) ret.append(file_name) return ret