Files
cristallina/beamline/kb_focusing.py

616 lines
23 KiB
Python
Executable File

from cam_server_client import PipelineClient
from cam_server_client.utils import get_host_port_from_stream_address
from bsread import source, SUB
from epics import PV
import numpy as np
import pandas as pd
import time
import datetime
from pathlib import Path
import json
from loguru import logger
import matplotlib.pyplot as plt
import datetime
from numpy.linalg import inv
import os
from queue import Queue
wait_time_benders = 1.0 # can probably be reduced as we wait for the move to finish
wait_time_aperture = 0.5 # can probably be reduced as we wait for the move to finish
def get_position_from_pipeline(pip_instance_id, data_field_name, n_pulses=1):
pipeline_client = PipelineClient()
stream_address = pipeline_client.get_instance_stream(pip_instance_id)
stream_host, stream_port = get_host_port_from_stream_address(stream_address)
with source(host=stream_host, port=stream_port, mode=SUB) as input_stream:
sum_pos = 0
for i in np.arange(n_pulses):
input_stream.connect()
message = input_stream.receive()
pos = message.data.data[data_field_name].value
sum_pos = sum_pos + pos
mean_pos = sum_pos / n_pulses
return mean_pos
def kbV_focusing_acquire(intermediate_results,stop_measurement,out_fpath,
bender_ds_center = 1.55,
bender_us_center = 1.49,
bender_us_span = 0.5,
bender_ds_span = 0.5,
bender_npoints = 5,
bender_us_start = None,
bender_ds_start = None,
aperture_center =0,
aperture_span =0.3,
aperture_npoints=3,
aperture_narrow_dimension = 0.15,
aperture_wide_dimension = 2,
n_pulses=1,
camera_name = "SARES30-CAMS156-XE",
):
""" Vertical KB mirror focusing acquisition with default parameters.
# Important!
Make sure the camera in the screen_panel is oriented correctly - double-check by steering the beam or with the camera move.
The focusing script won't work without correct camera orientation as it relies on the gauss fits from the screen_pannel.
# Parameters to check every time
bender_us&ds_center center positions of the benders, should be near the expected values
bender_us&ds_span how far are benders scanned, usally 0.5mm is enough
bender_npoints how many bender positions are checked (same for each bender). 5 already takes a while, so be weary of raising too far
aperture_center: central position of the aperture, choose such that the beam path is well alignedx
aperture_span: how far to each side should be the apperture scanned (symmetrically), default is 0.3mm
aperture_npoints: how many aperture positions are checked, usually 3 is enough
# Parameters that can usually remain as they are
bender_us&ds_start should be 0.3 below maximum focus
aperture_narrow_dimension: default set to 0.15mm
aperture_wide_dimension: default set to 2mm
n_pulses: number of shots per position, usually one is enough
camera_name: normally we use our X-ray eye, so "SARES30-CAMS156-XE"
"""
aperture = np.linspace(aperture_center-aperture_span, aperture_center+aperture_span, aperture_npoints)
bender_us = np.linspace(bender_us_center-bender_us_span, bender_us_center+bender_us_span, bender_npoints)
bender_ds = np.linspace(bender_ds_center-bender_ds_span, bender_ds_center+bender_ds_span, bender_npoints)
if bender_us_start == None:
bender_us_start = np.min(bender_us) - 0.3
if bender_ds_start == None:
bender_ds_start = np.min(bender_ds) - 0.3
return kb_focusing_acquire(intermediate_results,stop_measurement,out_fpath,
direction="vertical",
bender_us=bender_us,
bender_ds=bender_ds,
bender_us_start=bender_us_start,
bender_ds_start=bender_ds_start,
aperture=aperture,
aperture_size=aperture_narrow_dimension,
aperture_size_pendicular=aperture_wide_dimension,
n_pulses=n_pulses,
camera_name =camera_name,
)
def kbH_focusing_acquire(intermediate_results,stop_measurement,out_fpath,
bender_ds_center = 1.6,
bender_us_center = 1.8,
bender_us_span = 0.5,
bender_ds_span = 0.5,
bender_npoints = 3,
bender_us_start = None,
bender_ds_start = None,
aperture_center =0,
aperture_span =0.3,
aperture_npoints=3,
aperture_narrow_dimension = 0.15,
aperture_wide_dimension = 2,
n_pulses=1,
camera_name = "SARES30-CAMS156-XE",
):
""" Horizontal KB mirror focusing acquisition with default parameters.
# Important!
Make sure the camera in the screen_panel is oriented correctly - double-check by steering the beam or with the camera move.
The focusing script won't work without correct camera orientation as it relies on the gauss fits from the screen_pannel.
# Parameters to check every time
bender_us&ds_center center positions of the benders, should be near the expected values
bender_us&ds_span how far are benders scanned, usally 0.5mm is enough
bender_npoints how many bender positions are checked (same for each bender). 5 already takes a while, so be weary of raising too far
aperture_center: central position of the aperture, choose such that the beam path is well alignedx
aperture_span: how far to each side should be the apperture scanned (symmetrically), default is 0.3mm
aperture_npoints: how many aperture positions are checked, usually 3 is enough
# Parameters that can usually remain as they are
bender_us&ds_start should be 0.3 below maximum focus
aperture_narrow_dimension: default set to 0.15mm
aperture_wide_dimension: default set to 2mm
n_pulses: number of shots per position, usually one is enough
camera_name: normally we use our X-ray eye, so "SARES30-CAMS156-XE"
"""
aperture = np.linspace(aperture_center-aperture_span, aperture_center+aperture_span, aperture_npoints)
bender_us = np.linspace(bender_us_center-bender_us_span, bender_us_center+bender_us_span, bender_npoints)
bender_ds = np.linspace(bender_ds_center-bender_ds_span, bender_ds_center+bender_ds_span, bender_npoints)
if bender_us_start == None:
bender_us_start = np.min(bender_us) - 0.3
if bender_ds_start == None:
bender_ds_start = np.min(bender_ds) - 0.3
return kb_focusing_acquire(intermediate_results,stop_measurement,out_fpath,
direction="horizontal",
bender_us=bender_us,
bender_ds=bender_ds,
bender_us_start=bender_us_start,
bender_ds_start=bender_ds_start,
aperture=aperture,
aperture_size=aperture_narrow_dimension,
aperture_size_pendicular=aperture_wide_dimension,
n_pulses=n_pulses,
camera_name =camera_name,
)
def kb_focusing_acquire(intermediate_results,stop_measurement,out_fpath,
direction=None,
bender_us=[1.49, 1.54, 1.59],
bender_ds=[1.79, 1.84, 1.89],
bender_us_start=1.29,
bender_ds_start=1.59,
aperture=[-0.3, 0, 0.3],
aperture_size=0.15,
aperture_size_pendicular=1.2,
n_pulses=1,
camera_name = "SARES30-CAMS156-XE",
):
"""
KB mirror focusing acquisition routine for Cristallina.
Function can run as a thread and be stopped any time with defining global variable stop_measurement as True'
"""
bender_us = np.sort(bender_us)
bender_ds = np.sort(bender_ds)
KBV_NAME = "SAROP31-OKBV153"
KBH_NAME = "SAROP31-OKBH154"
if direction == "vertical":
kb_name = KBV_NAME
elif direction == "horizontal":
kb_name = KBH_NAME
else:
raise KeyError('The direction must be given as "horizontal" or "vertical"')
BU = PV(kb_name + ":BU.VAL")
BD = PV(kb_name + ":BD.VAL")
# TODO: is the separation necessary?
BU_RB = PV(kb_name + ":BU.VAL")
BD_RB = PV(kb_name + ":BD.VAL")
# Aperture
aperture = np.sort(aperture)
APU_NAME = "SAROP31-OAPU149"
if direction == "vertical":
APU_CENTER = PV(APU_NAME + ":MOTOR_Y.VAL")
APU_CENTER_RB = PV(APU_NAME + ":MOTOR_Y.RBV")
APU_SIZE = PV(APU_NAME + ":MOTOR_H.VAL")
APU_SIZE_RB = PV(APU_NAME + ":MOTOR_H.RBV")
APU_SIZE_PERPENDICULAR = PV(APU_NAME + ":MOTOR_W.VAL")
APU_SIZE_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_W.RBV")
APU_CENTER_PERPENDICULAR = PV(APU_NAME + ":MOTOR_X.VAL")
APU_CENTER_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_X.RBV")
# Camera field name
data_field_name = "y_fit_mean"
elif direction == "horizontal":
APU_CENTER = PV(APU_NAME + ":MOTOR_X.VAL")
APU_SIZE = PV(APU_NAME + ":MOTOR_W.VAL")
APU_CENTER_RB = PV(APU_NAME + ":MOTOR_X.RBV")
APU_SIZE_RB = PV(APU_NAME + ":MOTOR_W.RBV")
APU_SIZE_PERPENDICULAR = PV(APU_NAME + ":MOTOR_H.VAL")
APU_SIZE_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_H.RBV")
APU_CENTER_PERPENDICULAR = PV(APU_NAME + ":MOTOR_Y.VAL")
APU_CENTER_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_Y.RBV")
# Camera field name
data_field_name = "x_fit_mean"
# Camera to be used
pip_instance_id = camera_name + "_sp"
### Acquisition start
# Get reference position
apu_center_ref = APU_CENTER_RB.get()
apu_size_ref = APU_SIZE_RB.get()
apu_size_perp_ref = APU_SIZE_PERPENDICULAR.get()
apu_center_perp_ref = APU_CENTER_PERPENDICULAR.get()
logger.info("BU/BD sent to start")
BU.put(bender_us_start, wait=False)
BD.put(bender_ds_start, wait=False)
APU_SIZE.put(aperture_size, wait=False)
APU_SIZE_PERPENDICULAR.put(aperture_size_pendicular, wait=False)
APU_CENTER.put(aperture[0], wait=False)
BU.put(bender_us_start, wait=True)
BD.put(bender_ds_start, wait=True)
time.sleep(wait_time_benders)
logger.info(f"BU to start: {bender_us_start:6.3f}")
logger.info(f"BD to start: {bender_ds_start:6.3f}")
bender_us_rb = []
bender_ds_rb = []
beam_positions = []
bender_scan_data = {}
datestr, timestr = generate_date_and_time_str()
for bu in bender_us:
if stop_measurement.is_set():
logger.info("Measurement aborted, will stop after this upstream bender position")
break
logger.info("")
BU.put(bu, wait=False)
for bd in bender_ds:
if stop_measurement.is_set():
logger.info("Measurement aborted, will stop after downstream bender position")
break
BU.put(bu, wait=False)
BD.put(bd, wait=False)
BU.put(bu, wait=True)
BD.put(bd, wait=True)
time.sleep(wait_time_benders)
logger.info(f" BU / BD positions = {bu:6.3f} / {bd:6.3f}")
bender_us_rb.append(BU_RB.get())
bender_ds_rb.append(BD_RB.get())
beam_pos = []
for apu in aperture:
APU_CENTER.put(apu, wait=True)
time.sleep(wait_time_aperture)
beam_pos_apu = get_position_from_pipeline(pip_instance_id, data_field_name, n_pulses=n_pulses)
logger.info(f" Aperture position = {apu:6.3f}; Beam position = {beam_pos_apu:6.3f}")
beam_pos.append(beam_pos_apu)
time.sleep(wait_time_aperture)
APU_CENTER.put(aperture[0], wait=False)
beam_positions.append(beam_pos)
intermediate_results.append([ BU_RB.get(), BD_RB.get(), beam_pos ])
BD.put(bender_ds_start, wait=True)
logger.info("")
logger.info(f"BD to start: {bender_ds_start:6.3f}")
time.sleep(wait_time_benders)
# save intermediate data
bender_scan_data["bender_us"] = bender_us_rb
bender_scan_data["bender_ds"] = bender_ds_rb
bender_scan_data["beam_positions"] = beam_positions
fpath = save_focusing_data(bender_scan_data, direction=direction, timestr=timestr, datestr=datestr)
res_filepath = convert_focusing_to_bender_data(fpath)
logger.info(f"BU to start: {bender_us_start:6.3f}")
APU_SIZE.put(apu_size_ref, wait=False)
APU_CENTER.put(apu_center_ref, wait=False)
APU_SIZE_PERPENDICULAR.put(apu_size_perp_ref, wait=False)
APU_CENTER_PERPENDICULAR.put(apu_center_perp_ref, wait=False)
BU.put(bender_us_start, wait=False)
BD.put(bender_ds_start, wait=False)
logger.info(f"Data saved to: {res_filepath}")
logger.info("Done")
out_fpath.put('')
return bender_scan_data
def generate_date_and_time_str():
t = datetime.datetime.now()
datestr = t.date().isoformat()
timestr = t.isoformat(timespec='minutes')
return datestr, timestr
def save_focusing_data(bender_scan_data, direction="unknown", timestr=None, datestr=None, beamline_directory="/sf/cristallina/applications/beamline/snapshots/KBs/"):
""" Saves bender focusing data to json for analysis in beamline directory.
"""
bender_scan_data["comment"] = "Cristallina bender focusing data"
if timestr is None:
datestr, timestr = generate_date_and_time_str()
directory = Path(beamline_directory) / datestr
directory.mkdir(parents=True, exist_ok=True)
fpath = directory / f"C_{timestr}_{direction}.json"
with open(fpath, "w") as f:
json.dump(bender_scan_data, f)
return fpath
def convert_focusing_to_bender_data(focusing_json_file):
""" Converts focusing data to text array for further processing.
"""
fpath = Path(focusing_json_file)
with open(fpath, "r") as f:
focusing_data = json.loads(f.read())
nrows = len(focusing_data['bender_us'])
arr = np.empty((nrows, 5))
arr[:, 0] = focusing_data['bender_us']
arr[:, 1] = focusing_data['bender_ds']
arr[:, 2:] = focusing_data['beam_positions']
Diff1 = arr[:,2] - arr[:,3]
Diff2 = arr[:,4] - arr[:,3]
# extend array with difference columns
arr = np.c_[arr, Diff1]
arr = np.c_[arr, Diff2]
out_fpath = fpath.with_suffix(".dat")
np.savetxt(out_fpath, arr)
return out_fpath
class ReadData:
'''Class to read either intermediate or final data written to file. Calculates beam movement for each bender position and the object can be fed to data plotting.'''
def __init__(self, datasource):
self.B1 =[]
self.B2 =[]
self.P1 =[]
self.P2 =[]
self.P3 =[]
# Check if the data is intermediate or final
if type(datasource) == list:
self.filename = "Intermediate data"
self.measured_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for point in datasource:
self.B1.append(float(point[0]))
self.B2.append(float(point[1]))
self.P1.append(float(point[2][0]))
self.P2.append(float(point[2][1]))
self.P3.append(float(point[2][2]))
elif type(datasource) == str:
self.filename = datasource
print('open file: {}'.format(self.filename))
if self.filename[-4:] == 'json':
f = pd.read_json(self.filename)
# Get the file creation time (platform-dependent)
creation_time = os.path.getctime(self.filename)
# Convert to formatted string
formatted_time = datetime.datetime.fromtimestamp(creation_time).strftime("%Y-%m-%d %H:%M:%S")
self.measured_date = formatted_time
self.B1 = list(f['bender_us'])
self.B2 = list(f['bender_ds'])
for i in range(len(self.B1)):
self.P1.append(f['beam_positions'][i][0])
self.P2.append(f['beam_positions'][i][1])
self.P3.append(f['beam_positions'][i][2])
else:
f = open(self.filename, 'r')
for line in f:
line = line.strip(' \t\n\r')
print(line)
if 'Date' in line:
self.measured_date = line[-19:]
if (line and line[0]!='#'):
columns = line.split()
self.B1.append(float(columns[0]))
self.B2.append(float(columns[1]))
self.P1.append(float(columns[2]))
self.P2.append(float(columns[3]))
self.P3.append(float(columns[4]))
f.close() # close file
else:
raise KeyError("datasource must be given as a string (filepath) or intermediate data list.\n\nLikely the data is still being recorded")
self.B1=np.array(self.B1)
self.B2=np.array(self.B2)
self.P1=np.array(self.P1)
self.P2=np.array(self.P2)
self.P3=np.array(self.P3)
self.D1=self.P1- self.P2
self.D3=self.P3- self.P2
# Plot data
def plotdata(H,Plot3D=True,PlotContour=False,show_figs=True):
'''Plot data'''
imax = H.B1.shape[0]
assert imax >= 4, f'There must be at least 4 points to evaluate the data, so far there are {imax} datapoints.'
ii = np.arange(imax)
#-------------- generate 2d matrix for positions ------------------------------
nx = 10
X1 = np.min(H.B1); X2=np.max(H.B1); dX = (X2-X1)/nx
x1 = np.arange(nx+1) * dX + X1
ny = 10
Y1 = np.min(H.B2); Y2=np.max(H.B2); dY = (Y2-Y1)/ny
y1 = np.arange(ny+1) * dY + Y1
b1 = np.tile(x1,(ny+1, 1)) # create 2d position matrix,
b2 = np.tile(y1,(nx+1, 1)) #
b2 = b2.transpose()
#--------------- Analytical: Fit Plane to 2d-data----------------------
A = np.array([
np.ones(np.size(H.B1)),
H.B1[:],
H.B2[:]
]).T
b = np.array([H.D1[:]]).T
fit1 = np.dot(np.dot(inv(np.dot(A.T, A)), A.T), b)
M1 = fit1[0] + fit1[1] * b1 + fit1[2] * b2
b = np.array([H.D3[:]]).T
fit3 = np.dot(np.dot(inv(np.dot(A.T, A)), A.T), b)
M3 = fit3[0] + fit3[1] * b1 + fit3[2] * b2
G1 = -(fit1[0] + fit1[1]*x1) / fit1[2]
G3 = -(fit3[0] + fit3[1]*x1) / fit3[2]
m1= -fit1[1] / fit1[2] ; n1 = -fit1[0] / fit1[2]
m3= -fit3[1] / fit3[2] ; n3 = -fit3[0] / fit3[2]
xs = (n3-n1)/(m1-m3)
ys = m1*xs + n1 # ys = m3*xs + n3
#-------------- Fit both beamlets ------------------------------
ys=np.atleast_1d(ys)
xs=np.atleast_1d(xs)
##################################### P L O T ###############################33
if Plot3D:
fig1 = plt.figure('3D',figsize=(10,10))
plt.clf()
ax = fig1.add_subplot(111, projection='3d')
ax.plot(H.B1, H.B2, H.D1, 'o', color="red")
ax.plot(H.B1, H.B2, H.D3, 'o', color="blue")
ax.set_xlabel("B1 (mm)")
ax.set_ylabel("B2 (mm)")
ax.set_zlabel('Difference (pxl)')
ax.set_title(f'{H.filename}'+'\n'+'\n'+"Best bender setting (B1, B2) = {0:4.3f} {1:4.3f}".format(xs[0],ys[0]))
ax.plot_wireframe(b1, b2, M1, rstride=1, cstride=1,color="red")
ax.plot_wireframe(b1, b2, M3, rstride=1, cstride=1,color="blue")
bottom=ax.get_zlim()[0]
size=1
plt.xlim([X1-size*dX,X2+size*dX])
plt.ylim([Y1-size*dY,Y2+size*dY])
# Mask G1, G3 to be inside lims
size=5
for i in np.arange(len(G1)):
if G1[i] < Y1-size*dY:
G1[i] = np.nan
elif G1[i] > Y2+size*dY:
G1[i] = np.nan
else:
pass
ax.plot(x1,G1,0, color="darkred", lw=3)
ax.plot(x1,G3,0, color="darkblue", lw=3)
ax.plot(xs,ys,0,'o', color="black", markersize=12)
outstring="B1={0:4.3f} B2={1:4.3f}".format(xs[0], ys[0])
if H.filename !='Intermediate data':
figname = H.filename+"_3d.png"
else:
figname = 'Intermediate data - 3D'
fig1.text(1.0,0.0,H.measured_date + " "+figname,fontsize=12 , va='bottom', ha='right')
# fig1.text(0, 0, __file__ , transform=fig1.transFigure, color= 'grey', verticalalignment='bottom', horizontalalignment='left',rotation=0)
bot =ax.get_zlim()[0]
# print("bottom=",bot)
# cp1= ax.contour(b1, b2, M1, 10, lw=3, cmap="autumn_r", offset=bot, linestyles='dashed' )
# cp3= ax.contour(b1, b2, M3, 10, lw=3, cmap="autumn_r", offset=bot, linestyles='solid' )
# plt.clabel(cp1, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f')
# plt.clabel(cp3, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f')
plt.plot(xs , ys , bot,'o', color="black", markersize=12)
plt.plot(H.B1, H.B2, bot,'o', color="black", markersize=4)
# ax.plot(x1,G1,bot, color="darkred")
# ax.plot(x1,G3,bot, color="darkblue")
if H.filename !='Intermediate data':
plt.savefig(figname)
if PlotContour:
# fig2 = plt.figure(2,figsize=(10,8))
fig2 = plt.figure('contour',figsize=(10,10))
plt.clf()
ax = plt.gca()
plt.xlabel("B1 (mm)")
plt.ylabel("B2 (mm)")
plt.title(H.filename)
cp1= plt.contour(b1, b2, M1 , linestyles='dashed' )
cp3= plt.contour(b1, b2, M3 , linestyles='solid')
# proxy1 = plt.Rectangle((0, 1), 1, 3, label='foo')
# proxy2 = plt.Rectangle((0, 0), 1, 1, label='bar')
# plt.legend(handles=[proxy1, proxy2], labels=['diff 1', 'diff2'])
plt.clabel(cp1, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f')
plt.clabel(cp3, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f')
plt.plot(x1,G1, color="black", ls='dashed',lw=2)
plt.plot(x1,G3, color="black",ls='solid',lw=2)
plt.plot(xs,ys,'o', color="black", markersize=12)
plt.plot(H.B1,H.B2,'o')
# sys.exit()
size=1
plt.xlim([X1-size*dX,X2+size*dX])
plt.ylim([Y1-size*dY,Y2+size*dY])
# plt.grid(b=True, which='both', color='0.65',linestyle='-')
outstring="B1={0:4.3f} B2={1:4.3f}".format(xs[0], ys[0])
plt.text(xs[0], ys[0] , outstring , va='top' ,fontsize=10, bbox=dict(facecolor='white', alpha=0.5))
plt.text(xs[0], ys[0] , outstring , va='top' ,fontsize=10, bbox=dict(facecolor='white', alpha=0.5))
# plt.text(0, 0, __file__ , transform=fig2.transFigure, color= 'grey', verticalalignment='bottom', horizontalalignment='left',rotation=0)
plt.text(0.02,0.04 , " dashed : delta1\n dotted : delta3", transform=ax.transAxes, va='top' ,fontsize=10, bbox=dict(facecolor='white', alpha=0.5))
ax.set_title(f'{H.filename}'+'\n'+'\n'+"Best bender setting (B1, B2) = {0:4.3f} {1:4.3f}".format(xs[0],ys[0]))
figname =H.filename+"_contour.png"
fig2.text(1.0,0.0,H.measured_date + " "+figname,fontsize=12 , va='bottom',ha='right')
if H.filename !='Intermediate data':
plt.savefig(figname)
if show_figs:
plt.show() # show plots on screen