Files
saresc/script/local.py
2026-01-09 16:22:50 +01:00

370 lines
14 KiB
Python
Executable File

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian
from mathutils import fit_polynomial,fit_gaussian, fit_harmonic, calculate_peaks
from mathutils import PolynomialFunction, Gaussian, HarmonicOscillator
import java.awt.Color as Color
###################################################################################################
# Layout setup
###################################################################################################
import ch.psi.pshell.data.LayoutSF as LayoutSF
LayoutSF.setExperimentArguments([charge, laser, rep_rate_bunch_1, rep_rate_bunch_2, destination_AR, energy_AR])
###################################################################################################
# Machine utilities
###################################################################################################
LASER_SETTLING_TIME = 3.0
def laser_on(bunch=None):
print "Laser On - bunch “ + str(bunch)
#caput("SIN-TIMAST-TMA:Beam-Las-Delay-Sel", 0)
if (bunch==1) or (bunch is None):
caput("SWISSFEL-STATUS:Bunch-1-OnDelay-Sel", 0)
if (bunch==2) or (bunch is None):
caput("SWISSFEL-STATUS:Bunch-2-OnDelay-Sel", 0)
caput("SIN-TIMAST-TMA:Beam-Apply-Cmd.PROC", 1)
time.sleep(LASER_SETTLING_TIME)
def laser_off(bunch=None):
print "Laser Off - bunch “ + str(bunch)
#caput("SIN-TIMAST-TMA:Beam-Las-Delay-Sel", 1)
if (bunch==1) or (bunch is None):
caput("SWISSFEL-STATUS:Bunch-1-OnDelay-Sel", 1)
if (bunch==2) or (bunch is None):
caput("SWISSFEL-STATUS:Bunch-2-OnDelay-Sel", 1)
caput("SIN-TIMAST-TMA:Beam-Apply-Cmd.PROC", 1)
time.sleep(LASER_SETTLING_TIME)
def is_laser_on(bunch=None):
#return (caget ("SIN-TIMAST-TMA:Beam-Las-Delay-Sel",'d') == 0 )
if bunch==1:
return (caget ("SWISSFEL-STATUS:Bunch-1-OnDelay-Sel",'d') == 0 )
if bunch==2:
return (caget ("SWISSFEL-STATUS:Bunch-2-OnDelay-Sel",'d') == 0 )
if bunch is None:
return is_laser_on(1) and is_laser_on(2)
def save_laser_state():
global laser_was_on_1, laser_was_on_2
laser_was_on_1 = is_laser_on(1)
laser_was_on_2 = is_laser_on(2)
def restore_laser_state():
global laser_was_on_1, laser_was_on_2
if laser_was_on_1:
laser_on(1)
else:
laser_off(1)
if laser_was_on_2:
laser_on(2)
else:
laser_off(2)
def get_beam_ok_channel(bunch):
if bunch==2:
return "SIN-CVME-TIFGUN-EVR0:BUNCH-2-OK"
if bunch==1:
return "SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"
def is_timing_ok():
return caget("SIN-TIMAST-TMA:SOS-COUNT-CHECK") == 0
def get_repetition_rate(bunch=1, setp=None):
if not setp:
if bunch==2:
ret = caget(c, 'd')
else:
ret = caget("SIN-TIMAST-TMA:Bunch-1-Appl-Freq-RB", 'd')
if setp==False or ret > 0:
return ret
print "Readback is 0: returning Setpoint"
sel = caget("SIN-TIMAST-TMA:Bunch-" + str(bunch) + "-Freq-Sel")
return float(sel.split(" ")[0])
###################################################################################################
# Maths utilities
###################################################################################################
def fit(ydata, xdata = None):
"""
Gaussian fit
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
#ydata = to_list(ydata)
#xdata = to_list(xdata)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
print "Max index:" + str(index_max),
print " x:" + str(max_x),
print " y:" + str(max_y)
gaussians = fit_gaussians(ydata, xdata, [index_max,])
(norm, mean, sigma) = gaussians[0]
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
fitted_gaussian_function = Gaussian(norm, mean, sigma)
scale_x = [float(min(xdata)), float(max(xdata)) ]
points = max((len(xdata)+1), 100)
resolution = (scale_x[1]-scale_x[0]) / points
fit_y = []
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
for x in fit_x:
fit_y.append(fitted_gaussian_function.value(x))
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
print "Mean -> " + str(mean)
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
return (norm, mean, sigma)
else:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Invalid gaussian fit: " + str(mean)
return (None, None, None)
def hfit(ydata, xdata = None):
"""
Harmonic fit
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
start,end = min(xdata), max(xdata)
(amplitude, angular_frequency, phase) = fit_harmonic(ydata, xdata)
fitted_harmonic_function = HarmonicOscillator(amplitude, angular_frequency, phase)
print "amplitude = ", amplitude
print "angular frequency = ", angular_frequency
print "phase = ", phase
f = angular_frequency/ (2* math.pi)
print "frequency = ", f
resolution = 4.00 # 1.00
fit_y = []
for x in frange(start,end,resolution, True):
fit_y.append(fitted_harmonic_function.value(x))
fit_x = frange(start, end+resolution, resolution)
p = plot(ydata,"data", xdata, title="HFit")[0]
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
#m = (phase + math.pi)/ angular_frequency
m = -phase / angular_frequency
if (m<start):
m+=(1.0/f)
if start <=m <=end:
print "fit = ", m
p.addMarker(m, None, "Fit="+str(round(m ,2)), Color.MAGENTA.darker())
return (amplitude, angular_frequency, phase, True, m, fit_x, fit_y)
else:
print "max = ",max_x
p.addMarker(max_x, None, "Max="+str(round(max_x ,2)), Color.MAGENTA.darker())
return (amplitude, angular_frequency, phase, False, max_x, fit_x, fit_y)
def clear_convex_hull_plot(title):
plots = get_plots(title = title)
if len(plots)>0:
plots[0].clear()
def add_convex_hull_plot(title, x,y, name=None, clear = False, x_range = None, y_range = None):
plots = get_plots(title = title)
p = None
if len(plots)==0:
p = plot(None,name=name, title = title)[0]
if x_range is not None:
p.getAxis(p.AxisId.X).setRange(x_range[0], x_range[1])
if y_range is not None:
p.getAxis(p.AxisId.Y).setRange(y_range[0], y_range[1])
p.setLegendVisible(True)
else:
p = plots[0]
if clear:
p.clear()
p.addSeries(LinePlotSeries(name))
s = p.getSeries(name)
s.setLinesVisible(False)
s.setPointSize(3)
x, y = to_array(x,'d') , to_array(y,'d')
s.setData(x, y)
#Convex Hull
#In the first time the plot shows, it takes some time for the color to be assigned
timeout = 0
while s.color is None and timeout<1000:
time.sleep(0.001)
timeout = timeout + 1
hull = LinePlotSeries(name + "Hull", s.color)
p.addSeries(hull)
#Bounding box
#x1,x2,y1,y2 = min(x), max(x), min(y), max(y)
#(hx,hy) = ([x1,x2, x2, x1, x1], [y1, y1, y2, y2, y1])
(hx,hy) = convex_hull(x=x, y=y)
hx.append(hx[0]); hy.append(hy[0])
hull.setLineWidth(2)
hull.setData(to_array(hx,'d') , to_array(hy,'d'))
hull.setColor(s.color)
return [hx,hy]
###################################################################################################
# Tools
###################################################################################################
def elog(title, message, attachments = [], application = None, author = None, category = "Info", domain = "", logbook = "SwissFEL commissioning data", encoding=1):
"""
Add entry to ELOG.
"""
if author is None:
author = "pshell" #get_context().user.name
if application is None:
application = get_exec_pars().name
typ = "pshell"
entry = ""
cmd = 'G_CS_ELOG_add -l "' + logbook + '" '
cmd = cmd + '-a "Author=' + author + '" '
cmd = cmd + '-a "Type=' + typ + '" '
cmd = cmd + '-a "Entry=' + entry + '" '
cmd = cmd + '-a "Title=' + title + '" '
cmd = cmd + '-a "Category=' + category + '" '
cmd = cmd + '-a "Domain=' + domain + '" '
cmd = cmd + '-a "Application=' + application + '" '
for attachment in attachments:
cmd = cmd + '-f "' + attachment + '" '
cmd = cmd + '-n ' + str(encoding)
cmd = cmd + ' "' + message + '"'
#print cmd
#os.system (cmd)
#print os.popen(cmd).read()
import subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
if (err is not None) and err!="":
raise Exception(err)
print out
import ch.psi.pshell.ui.App as App
if not App.isOutputRedirected():
import java.lang.System as System
System.out.println(out)
try:
return int(out[out.find("ID=") +3 : ])
except:
print out
###################################################################################################
# Pseudo-devices
###################################################################################################
class Sinusoid(ReadonlyRegisterBase):
def doRead(self):
self.x = self.x + 5.0 if hasattr(self, 'x') else 0.0
return math.sin(self.x * math.pi / 180.0)
add_device(Sinusoid("sim_sensor"), True)
sim_sensor.polling=1000
add_device(DummyPositioner("sim_positioner"), True)
###################################################################################################
# Camera server
###################################################################################################
def wait_cam_server_message(number_messages = 1, timeout = 10000):
for i in range (number_messages):
if not cam_server.stream.waitCacheChange(timeout):
raise Exception("Timeout receiving from camera server")
def get_cam_server_stats(number_images=1, async = True, interval=-1, good_region = False):
ret = []
wait_cam_server_message()
prefix = "gr_" if good_region else ""
for ident in [prefix+"x_center_of_mass", prefix+"y_center_of_mass", prefix+"x_rms", prefix+"y_rms"]:
child = cam_server.stream.getChild(ident)
av = create_averager(child, number_images, interval)
av.monitored = async
ret.append(av)
return ret
def wait_cam_server_background(background, timeout = 10000):
start = time.time()
while True:
processing_parameters = cam_server.getProcessingParameters()
if (processing_parameters is not None) and (str(background) == processing_parameters["image_background"]):
return
if (time.time()-start) > timeout/1000:
raise Exception("Timeout waiting for camera server background: " + str(background))
time.sleep(0.01)
###################################################################################################
# Camera scans
###################################################################################################
def setup_camera_scan():
global camera_name, bpm_name, number_images, use_background, multiple_background, number_backgrounds, dry_run
if not is_laser_on(1) and not is_laser_on(2):
raise Exception("Both bunches are on delay")
save_laser_state()
multiple_background = multiple_background and use_background
cam_server.start(camera_name + "_sp", use_screen_panel_stream)
if use_background:
if not dry_run:
laser_off()
bg=cam_server.captureBackground(1 if multiple_background else number_backgrounds)
cam_server.setBackgroundSubtraction(True)
if not multiple_background: wait_cam_server_background(bg)
else:
cam_server.setBackgroundSubtraction(False)
if not multiple_background:
if not dry_run:
restore_laser_state()
def before_sample_camera_scan():
global camera_name, number_images, use_background, multiple_background, number_backgrounds, dry_run
if multiple_background:
bg = cam_server.captureBackground(number_backgrounds)
wait_cam_server_background(bg)
if not dry_run:
#laser_on()
restore_laser_state()
wait_cam_server_message(number_images)
def after_sample_camera_scan():
if multiple_background:
if not dry_run:
laser_off()
def get_camera_scan_sensors():
global camera_name, number_images, use_background, multiple_background, number_backgrounds, dry_run
sensors = get_cam_server_stats(number_images, good_region=use_good_region)
if plot_image:
sensors.append(cam_server.getDataMatrix())
return sensors
def end_camera_scan():
global camera_name, number_images, use_background, multiple_background, number_backgrounds, dry_run
if not dry_run:
restore_laser_state()