Files
sf-rf/script/local.py
2016-07-01 08:01:48 +02:00

201 lines
6.6 KiB
Python
Executable File

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian
from mathutils import fit_polynomial,fit_gaussian, fit_harmonic, calculate_peaks
from mathutils import PolynomialFunction, Gaussian, HarmonicOscillator
import math
import random
import java.awt.Color as Color
def fit(ydata, xdata = None):
"""
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
print "Max index:" + str(index_max),
print " x:" + str(max_x),
print " y:" + str(max_y)
gaussians = fit_gaussians(ydata, xdata, [index_max,])
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
if gaussians[0] is None:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Fitting error"
return (None, None, None)
(norm, mean, sigma) = gaussians[0]
fitted_gaussian_function = Gaussian(norm, mean, sigma)
scale_x = [float(min(xdata)), float(max(xdata)) ]
points = max((len(xdata)+1), 100)
resolution = (scale_x[1]-scale_x[0]) / points
fit_y = []
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
for x in fit_x:
fit_y.append(fitted_gaussian_function.value(x))
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
print "Mean -> " + str(mean)
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
return (norm, mean, sigma)
else:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Invalid gaussian fit: " + str(mean)
return (None, None, None)
def hfit(ydata, xdata = None):
if xdata is None:
xdata = frange(0, len(ydata), 1)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
start,end = min(xdata), max(xdata)
(amplitude, angular_frequency, phase) = fit_harmonic(ydata, xdata)
fitted_harmonic_function = HarmonicOscillator(amplitude, angular_frequency, phase)
print "amplitude = ", amplitude
print "angular frequency = ", angular_frequency
print "phase = ", phase
f = angular_frequency/ (2* math.pi)
resolution = 0.01
fit_y = []
for x in frange(start,end,resolution, True):
fit_y.append(fitted_harmonic_function.value(x))
fit_x = frange(start, end+resolution, resolution)
p = plot(ydata,"data", xdata, title="HFit")[0]
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
#m = (phase + math.pi)/ angular_frequency
fit_max = -phase / angular_frequency
if (fit_max<start):
fit_max+=(1.0/f)
print "fit max= ", fit_max
p.addMarker(fit_max, None, "Fit max="+str(round(fit_max ,2)), Color.MAGENTA.darker())
return (amplitude, angular_frequency, phase, fit_max)
def elogllrf(subject, message, system, subsystem, section, attachments = []):
"""
Add entry to ELOG of LLRF: https://elog-gfa.psi.ch/LLRF/
Example: elogllrf("Hallo Test", "Das ist meine Nachricht...","LLRF", "General", "SINEG01")
"""
import getpass
author = "pshell/" + getpass.getuser()
cmd = 'G_CS_ELOG_add -l "LLRF" '
cmd = cmd + '-a "Author=' + author + '" '
cmd = cmd + '-a "Subject=' + subject + '" '
cmd = cmd + '-a "Section=' + section + '" '
cmd = cmd + '-a "System=' + system + '" '
cmd = cmd + '-a "Subsystem=' + subsystem + '" '
cmd = cmd + '-a "Machine=SwissFEL" '
cmd = cmd + '-a "Domain=All"'
for attachment in attachments:
cmd = cmd + ' -f "' + attachment + '" '
#print attachment
cmd = cmd + ' -x -n 1'
cmd = cmd + ' "' + message + '"'
print cmd
#os.system (cmd)
#print os.popen(cmd).read()
import subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
if (err is not None) and err!="":
raise Exception(err)
print out
def get_plot_snapshots(title = None, file_type = "jpg", temp_path = controller.setup.getContextPath()):
"""
Returns list with file names of plots snapshots from a plotting context.
"""
sleep(0.02) #Give some time to plot to be finished - it is not sync with acquisition
ret = []
for p in get_plots(title):
file_name = os.path.abspath(temp_path + "/" + p.getTitle() + "." + file_type)
p.saveSnapshot(file_name , file_type)
ret.append(file_name)
return ret
class Sinusoid(ReadonlyRegisterBase):
def doRead(self):
if not hasattr(self, 'step') : self.step = 10
self.x = self.x + self.step if hasattr(self, 'x') else 0.0
noise = (random.random() - 0.5) * 0.2
return math.sin(self.x * math.pi / 180.0) + noise
add_device(Sinusoid("sim"), True)
#------------- CAS --------------------
import ch.psi.pshell.epics.CAS as CAS
#CAS.setServerPort(5070)
class ArrayDevice(ReadonlyRegisterBase, ReadonlyRegisterArray):
def doRead(self):
return self.take()
def getSize(self):
global scan_result
return len(self.take())
def append(self, value):
c = self.take()
c.append(value)
self.set(c)
def set(self, value):
self.onReadout(to_array(value, 'd'))
class ScalarDevice(RegisterBase):
def doRead(self):
return self.val if hasattr(self, 'val') else 0.0
def doWrite(self, val):
self.val = val
add_device(ArrayDevice("scan_pos"), True)
add_device(ArrayDevice("scan_val"), True)
add_device(ScalarDevice("scan_start"), True)
add_device(ScalarDevice("scan_stop"), True)
add_device(ScalarDevice("scan_step"), True)
scan_start.write(-10.0)
scan_stop.write(10.0)
scan_step.write(1.0)
if not controller.localMode:
CAS.setServerPort(12345)
cas1=CAS("PSHELL:scanpos",scan_pos, "double")
cas2=CAS("PSHELL:scanval",scan_val, "double")
cas3=CAS("PSHELL:scanstart",scan_start, "double")
cas4=CAS("PSHELL:scanstop",scan_stop, "double")
cas5=CAS("PSHELL:scanstep",scan_step, "double")
#scan_pos.set([] )
#scan_val.set([])
#TODO: this is workaround to CAS not supporting dynamic arrays
MAX_ARRAY_DEV_SIZE = 5000
scan_pos.set([0.0,] * MAX_ARRAY_DEV_SIZE )
scan_val.set([0.0,] * MAX_ARRAY_DEV_SIZE )