import random import ch.psi.pshell.device.Readable.ReadableArray as ReadableArray import ch.psi.pshell.device.Readable.ReadableCalibratedArray as ReadableCalibratedArray import ch.psi.pshell.device.ArrayCalibration as ArrayCalibration import ch.psi.utils.Str from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian import java.awt.Color as Color Scienta = get_device("Scienta") #Scienta class is imported in startup.py, shadowing Scienta device name #Synchrronized Scienta counts for stat in Scienta.stats: add_device(stat, True) beam_ok = True class SimulatedOutput(Writable): def write(self, value): pass class SimulatedInput(Readable): def __init__(self): self.x = 0.0 def read(self): self.x = self.x + 0.2 noise = (random.random() - 0.5) / 20.0 return math.sin(self.x) + noise sout = SimulatedOutput() sinp = SimulatedInput() def integrate_image(vertical = True): data = Scienta.dataArray.read() #Integrate and plot (width,height) = Scienta.getImageSize().tolist() integration = [] if vertical: for i in range(width): p=0.0 for j in range(height): p=p+data[j*width+i] integration.append(p) else: for j in range(height): p=0.0 for i in range(width): p=p+data[j*width+i] integration.append(p) return integration class ImageEnergyDistribution(ReadableCalibratedArray): def getSize(self): (width,height) = Scienta.getImageSize().tolist() return width def read(self): return to_array(integrate_image(),'d') def getCalibration(self): c=Scienta.readImageDescriptor().calibration if c is None: return None return ArrayCalibration(c.scaleX, c.offsetX) EnergyDistribution = ImageEnergyDistribution() class ImageAngleDistribution(ReadableCalibratedArray): def getSize(self): (width,height) = Scienta.getImageSize().tolist() return height def read(self): return to_array(integrate_image(False),'d') def getCalibration(self): c=Scienta.readImageDescriptor().calibration if c is None: return None return ArrayCalibration(c.scaleY, c.offsetY) AngleDistribution = ImageAngleDistribution() class ImageCounts(Readable): def read(self): data = Scienta.dataArray.read() counts = sum(data) return counts Counts = ImageCounts() def init_scienta(): """ turn on the analyser and start a mock measurement so that we get the correct array size. start a scienta acquisition and abort after 4 seconds. """ if Scienta.isSimulated(): time.sleep(0.1) else: image_id = Scienta.currentImageCount Scienta.start() Scienta.waitReady(4000) Scienta.stop() Scienta.waitNewImage(500, image_id) def trig_scienta(): if Scienta.isSimulated(): time.sleep(0.1) else: image_id = Scienta.currentImageCount Scienta.start() Scienta.waitReady(-1) Scienta.waitNewImage(3000, image_id) from keithley import KeiSample, KeiReference def prepare_keithleys(dwell, triggered): """ prepare keithleys. at the moment, the dwell time has to be set manually by selecting one of the poll modes slow = 100 ms, medium = 20 ms, fast = 2 ms. """ KeiSample.prepare(dwell, triggered) KeiReference.prepare(dwell, triggered) def trig_keithleys(): """ trigger keithleys, do not wait. after this, you have to wait for at least the dwell time before reading the value! """ KeiSample.trig() KeiReference.trig() def wait_keithleys(): """ wait for one dwell time so that the keithleys can finish their measurement. """ time.sleep(KeiSample.dwell * 2.2) def fetch_keithleys(): """ read the keithley readings into EPICS. this requires that at least the dwell time has passed since the last trigger. the value can then be read from the SampleCurrent and ReferenceCurrent devices. """ KeiSample.fetch() KeiReference.fetch() def release_keithleys(): """ switch keithleys to free run. 0.1 s polling and dwell time """ KeiSample.release() KeiReference.release() def otf(mode="ENERGY", e1=None, e2=None, beta1=None, beta2=None, theta1=None, theta2=None, \ time=1.0, modulo=1, turn_off_beam=False): """ mode: "ENERGY" or "AMNGLE" """ run("otf", { "MODE":mode, \ "E1":float(e1) if e1 is not None else None, \ "E2":float(e2) if e2 is not None else None, \ "BETA1":float(beta1) if beta1 is not None else None, \ "BETA2":float(beta2) if beta2 is not None else None, \ "THETA1":float(theta1) if theta1 is not None else None, \ "THETA2":float(theta2) if theta2 is not None else None, \ "TIME":float(time), \ "MODULO":int(modulo), \ "ENDSCAN":turn_off_beam, \ }) diag_channels = [] #Manipulator Settings diag_channels.append(ManipulatorX.readback) diag_channels.append(ManipulatorY.readback) diag_channels.append(ManipulatorZ.readback) diag_channels.append(ManipulatorTheta.readback) diag_channels.append(ManipulatorTilt.readback) diag_channels.append(ManipulatorPhi.readback) # Beamline Settings diag_channels.append(MachineBumpXOffset) diag_channels.append(MachineBumpXAngle) diag_channels.append(MachineBumpYOffset) diag_channels.append(MachineBumpYAngle) diag_channels.append(DynamicBumpYOffset) diag_channels.append(DynamicBumpYAngle) diag_channels.append(FrontendVCenter) diag_channels.append(FrontendVSize) diag_channels.append(FrontendHCenter) diag_channels.append(FrontendHSize) diag_channels.append(MonoVCenter) diag_channels.append(MonoVSize) diag_channels.append(MonoBladeDown) diag_channels.append(MonoBladeUp) diag_channels.append(MonoHCenter) diag_channels.append(MonoHSize) diag_channels.append(MonoApertureMode) diag_channels.append(RefocusVCenter) diag_channels.append(RefocusVSize) diag_channels.append(RefocusHCenter) diag_channels.append(RefocusHSize) diag_channels.append(FocusYTrans) diag_channels.append(FocusZTrans) diag_channels.append(FocusXRot) diag_channels.append(FocusYRot) diag_channels.append(FocusZRot) diag_channels.append(RefocusYTrans) diag_channels.append(RefocusZTrans) diag_channels.append(RefocusXRot) diag_channels.append(RefocusYRot) diag_channels.append(RefocusZRot) diag_channels.append(MonoEnergy) diag_channels.append(MonoCff) diag_channels.append(MonoBeta) diag_channels.append(MonoTheta) diag_channels.append(MonoGrating) diag_channels.append(ExitSlit) # Auxiliary Measurements diag_channels.append(MachineCurrent) diag_channels.append(FocusWaterTemp) diag_channels.append(SampleCurrent) diag_channels.append(RefCurrent) #diag_channels.append(AuxCurrent) #diag_channels.append(AuxVoltage) diag_channels.append(SampleCurrentGain) diag_channels.append(RefCurrentGain) #diag_channels.append(AuxCurrentGain) #diag_channels.append(SampleCurrentAveraging) #diag_channels.append(RefCurrentAveraging) #diag_channels.append(AuxCurrentAveraging) #diag_channels.append(AuxVoltageAveraging) #diag_channels.append(SampleCurrentSampling) #diag_channels.append(RefCurrentSampling) #diag_channels.append(AuxCurrentSampling) #diag_channels.append(AuxVoltageSampling) diag_channels.append(ChamberPressure) diag_channels.append(BeamlinePressure) diag_channels.append(ManipulatorTempA) diag_channels.append(ManipulatorTempB) if get_device("ManipulatorCoolFlow"): diag_channels.append(ManipulatorCoolFlow) if get_device("ManipulatorCoolFlowSet"): diag_channels.append(ManipulatorCoolFlowSet) diag_channels_no_Scienta = list(diag_channels) # Scienta diag_channels.append(Scienta.channelBegin) #diag_channels.append(ChannelDouble("ChannelBegin", "X03DA-SCIENTA:cam1:CHANNEL_BEGIN_RBV")) diag_channels.append(Scienta.channelEnd) #diag_channels.append(ChannelDouble("ChannelEnd", "X03DA-SCIENTA:cam1:CHANNEL_END_RBV")) diag_channels.append(Scienta.sliceBegin) # diag_channels.append(ChannelDouble("SliceBegin", "X03DA-SCIENTA:cam1:SLICE_BEGIN_RBV")) diag_channels.append(Scienta.sliceEnd) #diag_channels.append(ChannelDouble("StepTime", "X03DA-SCIENTA:cam1:SLICE_END_RBV")) diag_channels.append(Scienta.numSlices) # diag_channels.append(ChannelDouble("NumSlices", "X03DA-SCIENTA:cam1:SLICES_RBV")) #diag_channels.append(Scienta.frames) # diag_channels.append(ChannelDouble("NumFrames", "X03DA-SCIENTA:cam1:FRAMES")) diag_channels.append(Scienta.numChannels) #diag_channels.append(ChannelDouble("NumChannels", "X03DA-SCIENTA:cam1:NUM_CHANNELS_RBV")) diag_channels.append(Scienta.lowEnergy) #diag_channels.append(ChannelDouble("LowEnergy", "X03DA-SCIENTA:cam1:LOW_ENERGY_RBV")) diag_channels.append(Scienta.centerEnergy) #diag_channels.append(ChannelDouble("CenterEnergy", "X03DA-SCIENTA:cam1:CENTRE_ENERGY_RBV")) diag_channels.append(Scienta.highEnergy) #diag_channels.append(ChannelDouble("HighEnergy", "X03DA-SCIENTA:cam1:HIGH_ENERGY_RBV")) diag_channels.append(ScientaDwellTime) diag_channels.append(AcquisitionMode) #diag_attrs.append(ChannelString("AcquisitionMode", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV")) diag_channels.append(EnergyMode) #diag_attrs.append(ChannelString("EnergyMode", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV")) diag_channels.append(LensMode) #diag_attrs.append(ChannelString("LensMode", "X03DA-SCIENTA:cam1:LENS_MODE_RBV")) diag_channels.append(DetectorMode) #diag_attrs.append(ChannelString("DetectorMode", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV")) diag_channels.append(PassEnergy) #diag_attrs.append(ChannelString("PassEnergy", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV")) diag_channels.append(ElementSet) #diag_attrs.append(ChannelString("ElementSet", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV")) diag_channels.append(ExcitationEnergy) #diag_channels.append(ChannelDouble("ExcitationEnergy", "X03DA-SCIENTA:cam1:EXCITATION_ENERGY_RBV")) diag_channels.append(StepSize) #diag_channels.append(ChannelDouble("StepSize", "X03DA-SCIENTA:cam1:STEP_SIZE_RBV")) diag_channels.append(NumIterations) #diag_channels.append(ChannelDouble("NumIterations", "X03DA-SCIENTA:cam1:NumExposures_RBV")) diag_channels.append(AnalyserSlit) #diag_attrs.append(ChannelString("ElemeAnalyserSlitntSet", "X03DA-SCIENTA:cam1:ANALYSER_SLIT_RBV")) snap_channels = [] snap_channels.append(KeiSample.rangeCh) snap_channels.append(KeiSample.usermodeCh) snap_channels.append(KeiSample.tottimeCh) snap_channels.append(KeiSample.setvoltageCh) snap_channels.append(KeiSample.voltoutCh) snap_channels.append(KeiSample.invertreadoutCh) snap_channels.append(KeiReference.rangeCh) snap_channels.append(KeiReference.usermodeCh) snap_channels.append(KeiReference.tottimeCh) snap_channels.append(KeiReference.setvoltageCh) snap_channels.append(KeiReference.voltoutCh) snap_channels.append(KeiReference.invertreadoutCh) diag_channels = sorted(diag_channels, key=lambda channel: channel.name) snap_channels = sorted(snap_channels, key=lambda channel: channel.name) def get_diag_name(diag): return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "") def print_diag(): for f in diag_channels: print "%-25s %s" % (get_diag_name(f) , str(f.read())) def create_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "attrs/" for f in diag_channels: create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd') def append_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "attrs/" for f in diag_channels: try: x = f.read() if x is None: x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan') append_dataset(group+get_diag_name(f), x) except: log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1])) def create_metadata_datasets(parent = None): if parent is None: parent = "/" group = parent + "general/" for name in ["proposer", "proposal", "pgroup", "sample"]: setting = get_setting(name) save_dataset(group+name, setting if setting is not None else "", 's') setting = get_setting("authors") save_dataset(group+"authors", setting.split("|") if setting is not None else [""], '[s') def wait_beam(): if not beam_ok: print "Waiting for beam..." while not beam_ok: time.sleep(0.1) print "Beam ok" def before_readout(): sample_scienta = False for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix", "Counts"]: if dev in SENSORS: sample_scienta = True break for dev in [Scienta.spectrum,EnergyDistribution, AngleDistribution, Scienta.dataMatrix, Counts]: if dev in SENSORS: sample_scienta = True break wait_beam() trig_keithleys() if sample_scienta: trig_scienta() else: wait_keithleys() fetch_keithleys() def after_readout(rec, scan): if beam_ok: if get_exec_pars().save: if rec.index == 0: if scan.index == 1: create_metadata_datasets() create_diag_datasets() append_diag_datasets() else: rec.invalidate() def after_scan(): """ Close shutter and turn off analyser """ caput("X03DA-PC:AFTER-SCAN.PROC", 1) caput("X03DA-OP-VG7:WT_SET", 0) #caput("X03DA-FE-AB1:CLOSE4BL", 0) #release_keithleys() def set_adc_averaging(dwelltime=0.0): if dwelltime == 0.0: dwelltime = Scienta.getStepTime().read() dwelltime = min(dwelltime, 20.0) dwelltime = max(dwelltime, 0.1) fixed = AcquisitionMode.read() == "Fixed" else: fixed = True prepare_keithleys(dwelltime, fixed) #value = Scienta.getStepTime().read() * 10.0 #averaging count in 100ms #SampleCurrentAveraging.write(value) #RefCurrentAveraging.write(value) #AuxCurrentAveraging.write(value) #AuxVoltageAveraging.write(value) def adjust_sensors(): #Updating ranges from Scienta Scienta.update() global SENSORS if SENSORS is not None: # Move integration to end #sample_scienta = False for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]: if dev in SENSORS: #sample_scienta = True SENSORS=SENSORS+[SENSORS.pop(SENSORS.index(dev))] for dev in ["Counts"]: if dev in SENSORS: #sample_scienta = True SENSORS=[SENSORS.pop(SENSORS.index(dev))] + SENSORS if "Scienta.dataMatrix" in SENSORS or Scienta.dataMatrix in SENSORS: print "Not ACC" set_exec_pars(accumulate = False) #if sample_scienta: # init_scienta() #Device aliases for data files set_device_alias(Scienta.dataMatrix, "ScientaImage") set_device_alias(Scienta.spectrum, "ScientaSpectrum") set_device_alias(Scienta.channelBegin, get_diag_name(Scienta.channelBegin)) set_device_alias(Scienta.channelEnd, get_diag_name(Scienta.channelEnd)) set_device_alias(Scienta.sliceBegin, get_diag_name(Scienta.sliceBegin)) set_device_alias(Scienta.sliceEnd, get_diag_name(Scienta.sliceEnd)) set_device_alias(Scienta.numChannels, get_diag_name(Scienta.numChannels)) set_device_alias(Scienta.numSlices, get_diag_name(Scienta.numSlices)) set_device_alias(Scienta.lowEnergy, get_diag_name(Scienta.lowEnergy)) set_device_alias(Scienta.centerEnergy, get_diag_name(Scienta.centerEnergy)) set_device_alias(Scienta.highEnergy, get_diag_name(Scienta.highEnergy)) set_device_alias(ManipulatorX.readback, get_diag_name(ManipulatorX.readback)) set_device_alias(ManipulatorY.readback, get_diag_name(ManipulatorY.readback)) set_device_alias(ManipulatorZ.readback, get_diag_name(ManipulatorZ.readback)) set_device_alias(ManipulatorTheta.readback, get_diag_name(ManipulatorTheta.readback)) set_device_alias(ManipulatorTilt.readback, get_diag_name(ManipulatorTilt.readback)) set_device_alias(ManipulatorPhi.readback, get_diag_name(ManipulatorPhi.readback)) #Additional device configuration ManipulatorPhi.trustedWrite = False def fit(ydata, xdata = None): """ """ if xdata is None: xdata = frange(0, len(ydata), 1) max_y= max(ydata) index_max = ydata.index(max_y) max_x= xdata[index_max] print "Max index:" + str(index_max), print " x:" + str(max_x), print " y:" + str(max_y) gaussians = fit_gaussians(ydata, xdata, [index_max,]) (norm, mean, sigma) = gaussians[0] p = plot([ydata],["data"],[xdata], title="Fit" )[0] fitted_gaussian_function = Gaussian(norm, mean, sigma) scale_x = [float(min(xdata)), float(max(xdata)) ] points = max((len(xdata)+1), 100) resolution = (scale_x[1]-scale_x[0]) / points fit_y = [] fit_x = frange(scale_x[0],scale_x[1],resolution, True) for x in fit_x: fit_y.append(fitted_gaussian_function.value(x)) p.addSeries(LinePlotSeries("fit")) p.getSeries(1).setData(fit_x, fit_y) if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2): print "Mean -> " + str(mean) p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker()) return (norm, mean, sigma) else: p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY) print "Invalid gaussian fit: " + str(mean) return (None, None, None) def elog(title, message, attachments = [], author = None, category = "Info", domain = "", logbook = "Experiments", encoding=1): """ Add entry to ELOG. """ if author is None: author = "pshell" #get_context().getUser().name typ = "pshell" entry = "" cmd = 'G_CS_ELOG_add -l "' + logbook+ '" ' cmd = cmd + '-a "Author=' + author + '" ' cmd = cmd + '-a "Type=' + typ + '" ' cmd = cmd + '-a "Entry=' + entry + '" ' cmd = cmd + '-a "Title=' + title + '" ' cmd = cmd + '-a "Category=' + category + '" ' cmd = cmd + '-a "Domain=' + domain + '" ' for attachment in attachments: cmd = cmd + '-f "' + attachment + '" ' cmd = cmd + '-n ' + str(encoding) cmd = cmd + ' "' + message + '"' #print cmd #os.system (cmd) #print os.popen(cmd).read() import subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) (out, err) = proc.communicate() if (err is not None) and err!="": raise Exception(err) print out def get_plot_snapshots(title = None, file_type = "jpg", temp_path = get_context().setup.getContextPath()): """ Returns list with file names of plots snapshots from a plotting context. """ sleep(0.02) #Give some time to plot to be finished - it is not sync with acquisition ret = [] for p in get_plots(title): file_name = os.path.abspath(temp_path + "/" + p.getTitle() + "." + file_type) p.saveSnapshot(file_name , file_type) ret.append(file_name) return ret