#from ch.psi.pshell.imaging.Overlays import * import ch.psi.pshell.imaging.Overlay as Overlay import ch.psi.pshell.imaging.Pen as Pen import java.awt.Font as Font import java.awt.Point as Point import java.awt.Color as Color import ch.psi.pshell.crlogic.CrlogicPositioner as CrlogicPositioner import ch.psi.pshell.crlogic.CrlogicSensor as CrlogicSensor import ch.psi.pshell.ui.App as App #get_context().dataManager.provider.embeddedAtributes = False #get_context().dataManager.provider.ADD_ATTRIBUTE_FILE_TIMESTAMP = True ################################################################################################### # EPICS utilities ################################################################################################### def caget_str(ch): return ''.join((chr(i) if i else "") for i in caget(ch)) def caput_str(ch, val): ret = [ord(c) for c in val] ret = ret + ([0] * (256-len(ret))) ret = to_array(ret, 'b') caput(ch, ret) ################################################################################################### def on_command_started(info): pass #print "Start: " + str(info.script) def on_command_finished(info): pass #print "Finished: " + str(info.script) + " Error: " + str(info.error) def on_change_data_path(path): print "Data path: " + str(path) ################################################################################################### # Layout setup ################################################################################################### #import ch.psi.pshell.data.LayoutSF as LayoutSF #LayoutSF.setExperimentArguments([pv, motor, pe, cv, energy, sin]) #Librariesenergy #import Jama.Matrix #sys.path.append('/Users/gobbo_a/dev/pshell/config/home/script/Lib/diffcalc') crlogic_config = {} crlogic_config["class"] = "ch.psi.pshell.crlogic.CrlogicScan" crlogic_config["prefix"] = "MTEST-HW3-CRL" crlogic_config["ioc"] = "MTEST-VME-HW3.psi.ch" crlogic_config["integrationTime"] = 0.01 crlogic_config["additionalBacklash"] = 0.0 #LOCAL DEFINITIONS """ from ch.psi.pshell.imaging.Utils import * from ch.psi.pshell.imaging.Overlays import * import ch.psi.pshell.imaging.Pen as Pen import java.awt.Font as Font import java.awt.Point as Point old = ov if globals().has_key("ov") else None r = show_panel(img) ov1 = Text(Pen(Color.ORANGE), "TEXT", Font("Verdana", Font.PLAIN, 12) , Point(20,20), ) ov2 = Rect(Pen(Color.ORANGE),Point(50,50), Point(100,100), ) ov3 = Rect(Pen(Color.RED),Point(150,150), Point(200,200), ) ov4 = Rect(Pen(Color.GREEN),Point(250,250), Point(300,300), ) ov5 = Text(Pen(Color.ORANGE), "TEXT 2", Font("Verdana", Font.PLAIN, 12) , Point(-50,-50), ) ov5.setAnchor(Overlay.ANCHOR_VIEWPORT_OR_IMAGE_BOTTOM_RIGHT) ov5.setFixed(True) ov1.setFixed(True) ov2.setFixed(True) ov2.setAnchor(Overlay.ANCHOR_VIEWPORT_TOP_LEFT) ov2.setMovable(True) ov2.setSolid(True) ov3.setFixed(True) ov3.setMovable(True) ov3.setSolid(True) ov4.setMovable(True) ov4.setSolid(True) ov=[ov1,ov2, ov3, ov4, ov5] r.updateOverlays(ov, old) """ #from jeputils import * #det.data.monitored=True import random #State listener: cleanup can be made at end of execution def onStateChange(state, former, script): if state == State.Closing: #print "Closing the app" pass elif script is not None: if state.isProcessing(): #print "Started script " + script pass elif former.isProcessing(): #print "Finished script " + script pass class Listener(ContextListener): def onContextStateChanged(self, state, former): onStateChange(state, former, get_context().runningScriptName) def scan_e(start, end, step, settling_time = 0, accumulation_time = None, name = None): """ """ if name is not None: set_exec_pars(name = name) print get_exec_pars().path lscan(inp,out, start, end, step,settling_time) #Simulated Devices #run("tutorial/devices") #run("src/main/assembly/help/Tutorial/devices") #run("devices") class SimulatedOutput(Writable): def write(self, value): pass class SimulatedInput(Readable): def __init__(self): self.x = 0.0 def read(self): self.x = self.x + 0.1 noise = (random.random() - 0.5) / 20.0 return math.sin(self.x) + noise sout = SimulatedOutput() sinp = SimulatedInput() #for m in mu, delta, gamma, eta, chi, phi: # m.setSpeed(m.config.defaultSpeed) #Controler Evenrt Listener clistener = Listener() get_context().addListener(clistener) def trig_scienta(): time.sleep(1.0) en_val = None class SimulatedEnergy(Writable): def write(self, value): self.put(value) def put(self, value, timeout = None): global en_val en_val = value def close(self): pass @staticmethod def test (): return "asd" class SimulatedEnergyReadback(Readable): def read(self): global en_val return en_val; def get(self): return self.read() def close(self): pass sim_energy = SimulatedEnergy() sim_energy_readback = SimulatedEnergyReadback() class beam_ok(ReadonlyRegisterBase): def doRead(self): return True add_device(beam_ok(), True) #Device utilities def integrate_image(): data = scienta.data.read() #Integrate and plot (width,height) = scienta.getImageSize().tolist() integration = [] for i in range(width): p=0.0 for j in range(height): p=p+data[j*width+i] integration.append(p) return integration def trig_scienta(): #scienta.start() #scienta.waitReady(-1) print "Trig Scienta" time.sleep(0.1) pass #Pseudo-Devices class ImageIntegrator(ReadableArray): def getSize(self): (width,height) = scienta.getImageSize().tolist() return width def read(self): return to_array(integrate_image(),'d') integration = ImageIntegrator() """ class Interlock(DeviceAdapter): def onValueChanging(self, device, value, former): if value > 5.0: raise Exception("Interlocked") motor.addListener(Interlock()) """ #This procedude is used to emulate file names generated by FDA """ import ch.psi.pshell.data.LayoutTable class Layout( ch.psi.pshell.data.LayoutTable): def getLogFileName(self): return time.strftime('%Y%m%d%H%M%S') + '_' + get_exec_pars().name + '_logs' def getDatasetName(self, scan): return time.strftime('%Y%m%d%H%M%S') + '_' + get_exec_pars().name + '_' + str(get_exec_pars().index).zfill(4) get_context().dataManager.setLayout(Layout()) """ """ import ch.psi.pshell.data.LayoutTable class DataLayout( ch.psi.pshell.data.LayoutTable): def getLogFileName(self): return time.strftime('%Y%m%d_%H%M') + '_' + get_exec_pars().script + '_logs' def getDatasetName(self, scan): print get_exec_pars().count data_file = time.strftime('%Y%m%d_%H%M') + '_' + get_exec_pars().name + '_' + str(get_exec_pars().count).zfill(4) print "Opened data file: " + get_exec_pars().path + "/" + data_file return data_file get_context().dataManager.setLayout(DataLayout()) """ #Interlocks """ class MyInterlock1 (Interlock): #Motor and Positioners def __init__(self): Interlock.__init__(self, (motor, pe)) def check(self, (m, p)): if p>50.0 and (m<50 and m>40): return False return True interlock1 = MyInterlock1() """ """ #Motor group class MyInterlock2(Interlock): def __init__(self): Interlock.__init__(self, (table, pe)) def check(self, ((m1,m2), p)): if p<500 and (m1>4 and m2>4): return False return True interlock2 = MyInterlock2() """ """ #Discrete Positioner class MyInterlock3(Interlock): def __init__(self): Interlock.__init__(self, (tab, pe)) def check(self, (tab, p)): if p<500 and tab=="Out": return False return True interlock3 = MyInterlock3() """ """ import ch.psi.pshell.data.LayoutTable class DataLayout( ch.psi.pshell.data.LayoutTable): def getLogFileName(self): return time.strftime('%Y%m%d_%H%M') + '_' + get_exec_pars().name + '_logs' def getDatasetName(self, scan): print get_exec_pars().count data_file = time.strftime('%Y%m%d_%H%M') + '_' + get_exec_pars().name + '_' + str(get_exec_pars().count).zfill(4) print "Opened data file: " + get_exec_pars().path + "/" + data_file return data_file get_context().dataManager.setLayout(DataLayout()) """ #run("CPython/wrapper") class SimulatedImage(ReadonlyRegisterBase, ReadonlyRegisterMatrix, ReadableCalibratedMatrix): def __init__(self, size_x=1280, size_y=960, number_of_dead_pixels=100, noise=0.1, beam_size_x=100, beam_size_y=20): self.size_x, self.size_y, self.number_of_dead_pixels, self.noise, self.beam_size_x, self.beam_size_y = \ size_x, size_y, number_of_dead_pixels, noise, beam_size_x, beam_size_y self.sc = None def doRead(self): if self.sc is None: self.sc = new_simulated_camera(self.size_x, self.size_y, self.number_of_dead_pixels, self.noise, self.beam_size_x, self.beam_size_y) ret = get_image() return Convert.reshape(ret,self.size_y, self.size_x) def getWidth(self): return self.width def getHeight(self): return self.height def getCalibration(self): return MatrixCalibration(-1.0, 1.0, 0.0, 0.0) #add_device(RegisterMatrixSource("sim", SimulatedImage()), True) #sim.polling = -250 def get_next_fid(folder, prefix, ext): try: import glob files = glob.glob(folder + prefix + '*_*.dat') last = max(files) index = int (last[last.rfind('_')+1 : last.rfind('.')]) + 1 return index except: return 0 def wait_channel(name, value, timeout =None, type='s'): print "Waiting " + str(name) + " = " + str(value) cawait(name, value, timeout = timeout, type=type) print "Done waiting" def convert_file(input_file_name, output_file_name, field = 0, pol = 0, keithley_3 = False): sep = "\t" line_sep = "\n" with open(input_file_name) as inp: lines = inp.readlines() with open(output_file_name, "wb") as out: out.write("Energy" + sep + "rbkenergy" + sep + "Mag" + sep + "Pol" + sep + "Io" + sep + "TEY" + sep + "Norm" + line_sep) s = sep + " " #File format has a space before numeric values for line in lines[1:]: line = line.strip() if line=="": break try: (Ecrbk,CADC1, CADC2, NORM, CADC3, CADC4, MCurr, cffrbk, ID1Erbk, ID2Erbk, vTime) = line.split(" ") #out.write(Ecrbk + sep + Ecrbk + sep + str(field) + sep + str(pol) + sep + CADC1 + sep + CADC2 + sep + (CADC3 if keithley_3 else NORM) + line_sep) out.write(" " + Ecrbk + s + Ecrbk + s + str(field) + s + str(pol) + s + CADC1 + s + CADC2 + s + (CADC3 if keithley_3 else NORM) + line_sep) except: traceback.print_exc() def plot_file(file, title = None): """ """ table = Table.load(file, "\t", '#') plots = plot(table, title = title) from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian,fit_harmonic,HarmonicOscillator import java.awt.Color as Color def fit(ydata, xdata = None): if xdata is None: xdata = frange(0, len(ydata), 1) max_y= max(ydata) index_max = ydata.index(max_y) max_x= xdata[index_max] print "Max index:" + str(index_max), print " x:" + str(max_x), print " y:" + str(max_y) gaussians = fit_gaussians(ydata, xdata, [index_max,]) p = plot([ydata],["data"],[xdata], title="Fit" )[0] if gaussians[0] is None: #Fitting error p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY) print "Fitting error - using max value" return (None, max_x, None) (norm, mean, sigma) = gaussians[0] fitted_gaussian_function = Gaussian(norm, mean, sigma) scale_x = [float(min(xdata)), float(max(xdata)) ] points = max((len(xdata)+1), 100) resolution = (scale_x[1]-scale_x[0]) / points fit_y = [] fit_x = frange(scale_x[0],scale_x[1],resolution, True) for x in fit_x: fit_y.append(fitted_gaussian_function.value(x)) p.addSeries(LinePlotSeries("fit")) p.getSeries(1).setData(fit_x, fit_y) if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2): print "Mean -> " + str(mean) p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker()) return (norm, mean, sigma) else: p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY) print "Invalid gaussian fit: " + str(mean) + " - using max value" return (None, max_x, None) def hfit(ydata, xdata = None): if xdata is None: xdata = frange(0, len(ydata), 1) max_y= max(ydata) index_max = ydata.index(max_y) max_x= xdata[index_max] start,end = min(xdata), max(xdata) (amplitude, angular_frequency, phase) = fit_harmonic(ydata, xdata) fitted_harmonic_function = HarmonicOscillator(amplitude, angular_frequency, phase) print "amplitude = ", amplitude print "angular frequency = ", angular_frequency print "phase = ", phase f = angular_frequency/ (2* math.pi) print "frequency = ", f resolution = 0.01 fit_y = [] for x in frange(start,end,resolution, True): fit_y.append(fitted_harmonic_function.value(x)) fit_x = frange(start, end+resolution, resolution) p = plot(ydata,"data", xdata, title="HFit")[0] p.addSeries(LinePlotSeries("fit")) p.getSeries(1).setData(fit_x, fit_y) #m = (phase + math.pi)/ angular_frequency m = -phase / angular_frequency if (m0: plots[0].clear() def add_convex_hull_plot(title, x,y, name=None, clear = False, x_range = None, y_range = None): plots = get_plots(title = title) p = None if len(plots)==0: p = plot(None,name=name, title = title)[0] if x_range is not None: p.getAxis(p.AxisId.X).setRange(x_range[0], x_range[1]) if y_range is not None: p.getAxis(p.AxisId.Y).setRange(y_range[0], y_range[1]) p.setLegendVisible(True) else: p = plots[0] if clear: p.clear() p.addSeries(LinePlotSeries(name)) s = p.getSeries(name) s.setLinesVisible(False) s.setPointSize(3) x, y = to_array(x,'d') , to_array(y,'d') s.setData(x, y) #Convex Hull #In the first time the plot shows, it takes some time for the color to be assigned timeout = 0 while s.color is None and timeout<1000: time.sleep(0.001) timeout = timeout + 1 hull = LinePlotSeries(name + "Hull", s.color) p.addSeries(hull) #Bounding box #x1,x2,y1,y2 = min(x), max(x), min(y), max(y) #(hx,hy) = ([x1,x2, x2, x1, x1], [y1, y1, y2, y2, y1]) (hx,hy) = convex_hull(x=x, y=y) hx.append(hx[0]); hy.append(hy[0]) hull.setLineWidth(2) hull.setData(to_array(hx,'d') , to_array(hy,'d')) hull.setColor(s.color) return [hx,hy] class CamtoolValue(Readable): def __init__(self, channel, alias = None): self.channel=channel set_device_alias(self, channel if (alias is None) else alias) def read(self): return camtool.getValue(self.channel) class CamtoolArray(ReadableArray): def __init__(self, channel, alias = None): self.channel=channel set_device_alias(self, channel if (alias is None) else alias) def read(self): return camtool.getValue(self.channel) def getSize(self): return len(camtool.getValue(self.channel)) class CamtoolImage(ReadableMatrix): def __init__(self, alias = None): set_device_alias(self, camtool.getCurrentCamera() + " image" if (alias is None) else alias) def read(self): return camtool.getData().matrix def getWidth(self): return camtool.getData().width def getHeight(self): return camtool.getData().height def wait_camtool_message(number_messages = 1, timeout = 10000): for i in range (number_messages): if not camtool.stream.waitCacheChange(timeout): raise Exception("Timeout receiving from Camtool") def get_camtool_stats(number_images=1, async = True, interval=-1, good_region = False): #return [CamtoolValue("gr_x_fit_mean"), CamtoolValue("gr_y_fit_mean"), CamtoolValue("gr_x_fit_standard_deviation"), CamtoolValue("gr_y_fit_standard_deviation")] ret = [] wait_camtool_message() prefix = "gr_" if good_region else "" for ident in [prefix+"x_fit_mean", prefix+"y_fit_mean", prefix+"x_fit_standard_deviation", prefix+"y_fit_standard_deviation"]: child = camtool.stream.getChild(ident) av = create_averager(child, number_images, interval) av.monitored = async ret.append(av) return ret """ import ch.psi.fda.LayoutFDA class DataLayout( ch.psi.fda.LayoutFDA): def getLogFileName(self): return time.strftime('%Y%m%d_%H%M') + '_' + get_exec_pars().name + '_logs' def getDatasetName(self, scan): return time.strftime('%Y%m%d_%H%M') + '_' + get_exec_pars().name + '_' + str(get_exec_pars().count-1).zfill(4) get_context().dataManager.setLayout(DataLayout()) """ def is_panel(): return get_exec_pars().source == CommandSource.plugin def is_ui(): return get_exec_pars().source == CommandSource.ui class AnalogOutput(RegisterBase): def doRead(self): return self.val if hasattr(self, 'val') else 0.0 def doWrite(self, val): self.val = val class Sinusoid(ReadonlyRegisterBase): def doRead(self): noise = (random.random() - 0.5) * 0.15 return round(math.sin(1.0*time.time()) + noise , 3) class Bpm(ReadonlyRegisterBase): def doRead(self): noise = (random.random() - 0.5) * 0.15 return round(math.sin(0.5*math.radians(phase.read())) + noise , 3) class SinusoidWaveform(ReadonlyRegisterBase, ReadonlyRegisterArray): def doRead(self): time.sleep(0.001) ret = [] x = random.random() for i in range (20): ret.append(math.sin(x)) x = x + 0.1 return ret def getSize(self): return len(self.take(-1)) #only reads if cache is None class SinusoidImage(ReadonlyRegisterBase, ReadonlyRegisterMatrix): def doRead(self): time.sleep(0.001) (width, height) = (200, 100) ret = [] x = random.random(); base = [] for i in range (width): base.append( math.sin(x)) x = x + 0.05 for i in range (height): noise = (random.random() - 0.5)/5.0 ret.append([x+noise for x in base]) return to_array(ret,'d') def getWidth(self): return len(self.take(-1)[0]) def getHeight(self): return len(self.take(-1)) """ add_device(Bpm("bpm_x"), True) add_device(Sinusoid("amplitude"), True) add_device(Sinusoid("power"), True) add_device(SinusoidWaveform("waveform1"), True) add_device(SinusoidImage("detector1"), True) import ch.psi.pshell.imaging.RegisterMatrixSource as RegisterMatrixSource add_device(RegisterMatrixSource("image1", detector1), True) bpm_x.setPolling(100) waveform1.setPolling(1000) image1.setPolling(-200) add_device(DummyPositioner("phase"),True) """ class DigitalInput(ReadonlyRegisterBase): def doRead(self): return False if (int(time.time()) %2 == 0 ) else True add_device(DigitalInput("di"),True) di.polling=1000 ### Diffcalc GEOMETRY_PREFERENCE = "geometry" def get_geometry(): """ """ setting = get_setting(GEOMETRY_PREFERENCE) if setting is None or (len(setting.strip()) == 0): return None return setting def set_geometry(value, apply = None): """ """ if value is None or (len(value.strip()) == 0): set_setting(GEOMETRY_PREFERENCE, "" ) for name in "wavelength", "hkl_group", "h", "k", "l": dev = get_device(name) if dev is not None: remove_device(dev) return filename = get_context().setup.expandPath("{script}/geometry/"+ str(value)+".py") if not os.path.isfile(filename): raise Exception("Invalid geometry file: " + value) former = get_geometry() if ((apply is None) and former != value) or (apply==True) : set_setting(GEOMETRY_PREFERENCE, value ) run(filename) def is_geometry_set(): return get_device("wavelength") is not None #mu.moveAsync(0.0) #eta.moveAsync(0.0) #delta.moveAsync(45.0) #gamma.moveAsync(0.0) #energy.write(9.5)