from cam_server_client import PipelineClient from cam_server_client.utils import get_host_port_from_stream_address from bsread import source, SUB from epics import PV import numpy as np import pandas as pd import time import datetime from pathlib import Path import json from loguru import logger import matplotlib.pyplot as plt import datetime from numpy.linalg import inv import os from queue import Queue wait_time_benders = 1.0 # can probably be reduced as we wait for the move to finish wait_time_aperture = 0.5 # can probably be reduced as we wait for the move to finish def get_position_from_pipeline(pip_instance_id, data_field_name, n_pulses=1): pipeline_client = PipelineClient() stream_address = pipeline_client.get_instance_stream(pip_instance_id) stream_host, stream_port = get_host_port_from_stream_address(stream_address) with source(host=stream_host, port=stream_port, mode=SUB) as input_stream: sum_pos = 0 for i in np.arange(n_pulses): input_stream.connect() message = input_stream.receive() pos = message.data.data[data_field_name].value sum_pos = sum_pos + pos mean_pos = sum_pos / n_pulses return mean_pos def kbV_focusing_acquire(intermediate_results,stop_measurement,out_fpath, bender_ds_center = 1.55, bender_us_center = 1.49, bender_us_span = 0.5, bender_ds_span = 0.5, bender_npoints = 5, bender_us_start = None, bender_ds_start = None, aperture_center =0, aperture_span =0.3, aperture_npoints=3, aperture_narrow_dimension = 0.15, aperture_wide_dimension = 2, n_pulses=1, camera_name = "SARES30-CAMS156-XE", ): """ Vertical KB mirror focusing acquisition with default parameters. # Important! Make sure the camera in the screen_panel is oriented correctly - double-check by steering the beam or with the camera move. The focusing script won't work without correct camera orientation as it relies on the gauss fits from the screen_pannel. # Parameters to check every time bender_us&ds_center center positions of the benders, should be near the expected values bender_us&ds_span how far are benders scanned, usally 0.5mm is enough bender_npoints how many bender positions are checked (same for each bender). 5 already takes a while, so be weary of raising too far aperture_center: central position of the aperture, choose such that the beam path is well alignedx aperture_span: how far to each side should be the apperture scanned (symmetrically), default is 0.3mm aperture_npoints: how many aperture positions are checked, usually 3 is enough # Parameters that can usually remain as they are bender_us&ds_start should be 0.3 below maximum focus aperture_narrow_dimension: default set to 0.15mm aperture_wide_dimension: default set to 2mm n_pulses: number of shots per position, usually one is enough camera_name: normally we use our X-ray eye, so "SARES30-CAMS156-XE" """ aperture = np.linspace(aperture_center-aperture_span, aperture_center+aperture_span, aperture_npoints) bender_us = np.linspace(bender_us_center-bender_us_span, bender_us_center+bender_us_span, bender_npoints) bender_ds = np.linspace(bender_ds_center-bender_ds_span, bender_ds_center+bender_ds_span, bender_npoints) if bender_us_start == None: bender_us_start = np.min(bender_us) - 0.3 if bender_ds_start == None: bender_ds_start = np.min(bender_ds) - 0.3 return kb_focusing_acquire(intermediate_results,stop_measurement,out_fpath, direction="vertical", bender_us=bender_us, bender_ds=bender_ds, bender_us_start=bender_us_start, bender_ds_start=bender_ds_start, aperture=aperture, aperture_size=aperture_narrow_dimension, aperture_size_pendicular=aperture_wide_dimension, n_pulses=n_pulses, camera_name =camera_name, ) def kbH_focusing_acquire(intermediate_results,stop_measurement,out_fpath, bender_ds_center = 1.6, bender_us_center = 1.8, bender_us_span = 0.5, bender_ds_span = 0.5, bender_npoints = 3, bender_us_start = None, bender_ds_start = None, aperture_center =0, aperture_span =0.3, aperture_npoints=3, aperture_narrow_dimension = 0.15, aperture_wide_dimension = 2, n_pulses=1, camera_name = "SARES30-CAMS156-XE", ): """ Horizontal KB mirror focusing acquisition with default parameters. # Important! Make sure the camera in the screen_panel is oriented correctly - double-check by steering the beam or with the camera move. The focusing script won't work without correct camera orientation as it relies on the gauss fits from the screen_pannel. # Parameters to check every time bender_us&ds_center center positions of the benders, should be near the expected values bender_us&ds_span how far are benders scanned, usally 0.5mm is enough bender_npoints how many bender positions are checked (same for each bender). 5 already takes a while, so be weary of raising too far aperture_center: central position of the aperture, choose such that the beam path is well alignedx aperture_span: how far to each side should be the apperture scanned (symmetrically), default is 0.3mm aperture_npoints: how many aperture positions are checked, usually 3 is enough # Parameters that can usually remain as they are bender_us&ds_start should be 0.3 below maximum focus aperture_narrow_dimension: default set to 0.15mm aperture_wide_dimension: default set to 2mm n_pulses: number of shots per position, usually one is enough camera_name: normally we use our X-ray eye, so "SARES30-CAMS156-XE" """ aperture = np.linspace(aperture_center-aperture_span, aperture_center+aperture_span, aperture_npoints) bender_us = np.linspace(bender_us_center-bender_us_span, bender_us_center+bender_us_span, bender_npoints) bender_ds = np.linspace(bender_ds_center-bender_ds_span, bender_ds_center+bender_ds_span, bender_npoints) if bender_us_start == None: bender_us_start = np.min(bender_us) - 0.3 if bender_ds_start == None: bender_ds_start = np.min(bender_ds) - 0.3 return kb_focusing_acquire(intermediate_results,stop_measurement,out_fpath, direction="horizontal", bender_us=bender_us, bender_ds=bender_ds, bender_us_start=bender_us_start, bender_ds_start=bender_ds_start, aperture=aperture, aperture_size=aperture_narrow_dimension, aperture_size_pendicular=aperture_wide_dimension, n_pulses=n_pulses, camera_name =camera_name, ) def kb_focusing_acquire(intermediate_results,stop_measurement,out_fpath, direction=None, bender_us=[1.49, 1.54, 1.59], bender_ds=[1.79, 1.84, 1.89], bender_us_start=1.29, bender_ds_start=1.59, aperture=[-0.3, 0, 0.3], aperture_size=0.15, aperture_size_pendicular=1.2, n_pulses=1, camera_name = "SARES30-CAMS156-XE", ): """ KB mirror focusing acquisition routine for Cristallina. Function can run as a thread and be stopped any time with defining global variable stop_measurement as True' """ bender_us = np.sort(bender_us) bender_ds = np.sort(bender_ds) KBV_NAME = "SAROP31-OKBV153" KBH_NAME = "SAROP31-OKBH154" if direction == "vertical": kb_name = KBV_NAME elif direction == "horizontal": kb_name = KBH_NAME else: raise KeyError('The direction must be given as "horizontal" or "vertical"') BU = PV(kb_name + ":BU.VAL") BD = PV(kb_name + ":BD.VAL") # TODO: is the separation necessary? BU_RB = PV(kb_name + ":BU.VAL") BD_RB = PV(kb_name + ":BD.VAL") # Aperture aperture = np.sort(aperture) APU_NAME = "SAROP31-OAPU149" if direction == "vertical": APU_CENTER = PV(APU_NAME + ":MOTOR_Y.VAL") APU_CENTER_RB = PV(APU_NAME + ":MOTOR_Y.RBV") APU_SIZE = PV(APU_NAME + ":MOTOR_H.VAL") APU_SIZE_RB = PV(APU_NAME + ":MOTOR_H.RBV") APU_SIZE_PERPENDICULAR = PV(APU_NAME + ":MOTOR_W.VAL") APU_SIZE_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_W.RBV") APU_CENTER_PERPENDICULAR = PV(APU_NAME + ":MOTOR_X.VAL") APU_CENTER_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_X.RBV") # Camera field name data_field_name = "y_fit_mean" elif direction == "horizontal": APU_CENTER = PV(APU_NAME + ":MOTOR_X.VAL") APU_SIZE = PV(APU_NAME + ":MOTOR_W.VAL") APU_CENTER_RB = PV(APU_NAME + ":MOTOR_X.RBV") APU_SIZE_RB = PV(APU_NAME + ":MOTOR_W.RBV") APU_SIZE_PERPENDICULAR = PV(APU_NAME + ":MOTOR_H.VAL") APU_SIZE_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_H.RBV") APU_CENTER_PERPENDICULAR = PV(APU_NAME + ":MOTOR_Y.VAL") APU_CENTER_PERPENDICULAR_RB = PV(APU_NAME + ":MOTOR_Y.RBV") # Camera field name data_field_name = "x_fit_mean" # Camera to be used pip_instance_id = camera_name + "_sp" ### Acquisition start # Get reference position apu_center_ref = APU_CENTER_RB.get() apu_size_ref = APU_SIZE_RB.get() apu_size_perp_ref = APU_SIZE_PERPENDICULAR.get() apu_center_perp_ref = APU_CENTER_PERPENDICULAR.get() logger.info("BU/BD sent to start") BU.put(bender_us_start, wait=False) BD.put(bender_ds_start, wait=False) APU_SIZE.put(aperture_size, wait=False) APU_SIZE_PERPENDICULAR.put(aperture_size_pendicular, wait=False) APU_CENTER.put(aperture[0], wait=False) BU.put(bender_us_start, wait=True) BD.put(bender_ds_start, wait=True) time.sleep(wait_time_benders) logger.info(f"BU to start: {bender_us_start:6.3f}") logger.info(f"BD to start: {bender_ds_start:6.3f}") bender_us_rb = [] bender_ds_rb = [] beam_positions = [] bender_scan_data = {} datestr, timestr = generate_date_and_time_str() for bu in bender_us: if stop_measurement.is_set(): logger.info("Measurement aborted, will stop after this upstream bender position") break logger.info("") BU.put(bu, wait=False) for bd in bender_ds: if stop_measurement.is_set(): logger.info("Measurement aborted, will stop after downstream bender position") break BU.put(bu, wait=False) BD.put(bd, wait=False) BU.put(bu, wait=True) BD.put(bd, wait=True) time.sleep(wait_time_benders) logger.info(f" BU / BD positions = {bu:6.3f} / {bd:6.3f}") bender_us_rb.append(BU_RB.get()) bender_ds_rb.append(BD_RB.get()) beam_pos = [] for apu in aperture: APU_CENTER.put(apu, wait=True) time.sleep(wait_time_aperture) beam_pos_apu = get_position_from_pipeline(pip_instance_id, data_field_name, n_pulses=n_pulses) logger.info(f" Aperture position = {apu:6.3f}; Beam position = {beam_pos_apu:6.3f}") beam_pos.append(beam_pos_apu) time.sleep(wait_time_aperture) APU_CENTER.put(aperture[0], wait=False) beam_positions.append(beam_pos) intermediate_results.append([ BU_RB.get(), BD_RB.get(), beam_pos ]) BD.put(bender_ds_start, wait=True) logger.info("") logger.info(f"BD to start: {bender_ds_start:6.3f}") time.sleep(wait_time_benders) # save intermediate data bender_scan_data["bender_us"] = bender_us_rb bender_scan_data["bender_ds"] = bender_ds_rb bender_scan_data["beam_positions"] = beam_positions fpath = save_focusing_data(bender_scan_data, direction=direction, timestr=timestr, datestr=datestr) res_filepath = convert_focusing_to_bender_data(fpath) logger.info(f"BU to start: {bender_us_start:6.3f}") APU_SIZE.put(apu_size_ref, wait=False) APU_CENTER.put(apu_center_ref, wait=False) APU_SIZE_PERPENDICULAR.put(apu_size_perp_ref, wait=False) APU_CENTER_PERPENDICULAR.put(apu_center_perp_ref, wait=False) BU.put(bender_us_start, wait=False) BD.put(bender_ds_start, wait=False) logger.info(f"Data saved to: {res_filepath}") logger.info("Done") out_fpath.put('') return bender_scan_data def generate_date_and_time_str(): t = datetime.datetime.now() datestr = t.date().isoformat() timestr = t.isoformat(timespec='minutes') return datestr, timestr def save_focusing_data(bender_scan_data, direction="unknown", timestr=None, datestr=None, beamline_directory="/sf/cristallina/applications/beamline/snapshots/KBs/"): """ Saves bender focusing data to json for analysis in beamline directory. """ bender_scan_data["comment"] = "Cristallina bender focusing data" if timestr is None: datestr, timestr = generate_date_and_time_str() directory = Path(beamline_directory) / datestr directory.mkdir(parents=True, exist_ok=True) fpath = directory / f"C_{timestr}_{direction}.json" with open(fpath, "w") as f: json.dump(bender_scan_data, f) return fpath def convert_focusing_to_bender_data(focusing_json_file): """ Converts focusing data to text array for further processing. """ fpath = Path(focusing_json_file) with open(fpath, "r") as f: focusing_data = json.loads(f.read()) nrows = len(focusing_data['bender_us']) arr = np.empty((nrows, 5)) arr[:, 0] = focusing_data['bender_us'] arr[:, 1] = focusing_data['bender_ds'] arr[:, 2:] = focusing_data['beam_positions'] Diff1 = arr[:,2] - arr[:,3] Diff2 = arr[:,4] - arr[:,3] # extend array with difference columns arr = np.c_[arr, Diff1] arr = np.c_[arr, Diff2] out_fpath = fpath.with_suffix(".dat") np.savetxt(out_fpath, arr) return out_fpath class ReadData: '''Class to read either intermediate or final data written to file. Calculates beam movement for each bender position and the object can be fed to data plotting.''' def __init__(self, datasource): self.B1 =[] self.B2 =[] self.P1 =[] self.P2 =[] self.P3 =[] # Check if the data is intermediate or final if type(datasource) == list: self.filename = "Intermediate data" self.measured_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") for point in datasource: self.B1.append(float(point[0])) self.B2.append(float(point[1])) self.P1.append(float(point[2][0])) self.P2.append(float(point[2][1])) self.P3.append(float(point[2][2])) elif type(datasource) == str: self.filename = datasource print('open file: {}'.format(self.filename)) if self.filename[-4:] == 'json': f = pd.read_json(self.filename) # Get the file creation time (platform-dependent) creation_time = os.path.getctime(self.filename) # Convert to formatted string formatted_time = datetime.datetime.fromtimestamp(creation_time).strftime("%Y-%m-%d %H:%M:%S") self.measured_date = formatted_time self.B1 = list(f['bender_us']) self.B2 = list(f['bender_ds']) for i in range(len(self.B1)): self.P1.append(f['beam_positions'][i][0]) self.P2.append(f['beam_positions'][i][1]) self.P3.append(f['beam_positions'][i][2]) else: f = open(self.filename, 'r') for line in f: line = line.strip(' \t\n\r') print(line) if 'Date' in line: self.measured_date = line[-19:] if (line and line[0]!='#'): columns = line.split() self.B1.append(float(columns[0])) self.B2.append(float(columns[1])) self.P1.append(float(columns[2])) self.P2.append(float(columns[3])) self.P3.append(float(columns[4])) f.close() # close file else: raise KeyError("datasource must be given as a string (filepath) or intermediate data list.\n\nLikely the data is still being recorded") self.B1=np.array(self.B1) self.B2=np.array(self.B2) self.P1=np.array(self.P1) self.P2=np.array(self.P2) self.P3=np.array(self.P3) self.D1=self.P1- self.P2 self.D3=self.P3- self.P2 # Plot data def plotdata(H,Plot3D=True,PlotContour=False,show_figs=True): '''Plot data''' imax = H.B1.shape[0] assert imax >= 4, f'There must be at least 4 points to evaluate the data, so far there are {imax} datapoints.' ii = np.arange(imax) #-------------- generate 2d matrix for positions ------------------------------ nx = 10 X1 = np.min(H.B1); X2=np.max(H.B1); dX = (X2-X1)/nx x1 = np.arange(nx+1) * dX + X1 ny = 10 Y1 = np.min(H.B2); Y2=np.max(H.B2); dY = (Y2-Y1)/ny y1 = np.arange(ny+1) * dY + Y1 b1 = np.tile(x1,(ny+1, 1)) # create 2d position matrix, b2 = np.tile(y1,(nx+1, 1)) # b2 = b2.transpose() #--------------- Analytical: Fit Plane to 2d-data---------------------- A = np.array([ np.ones(np.size(H.B1)), H.B1[:], H.B2[:] ]).T b = np.array([H.D1[:]]).T fit1 = np.dot(np.dot(inv(np.dot(A.T, A)), A.T), b) M1 = fit1[0] + fit1[1] * b1 + fit1[2] * b2 b = np.array([H.D3[:]]).T fit3 = np.dot(np.dot(inv(np.dot(A.T, A)), A.T), b) M3 = fit3[0] + fit3[1] * b1 + fit3[2] * b2 G1 = -(fit1[0] + fit1[1]*x1) / fit1[2] G3 = -(fit3[0] + fit3[1]*x1) / fit3[2] m1= -fit1[1] / fit1[2] ; n1 = -fit1[0] / fit1[2] m3= -fit3[1] / fit3[2] ; n3 = -fit3[0] / fit3[2] xs = (n3-n1)/(m1-m3) ys = m1*xs + n1 # ys = m3*xs + n3 #-------------- Fit both beamlets ------------------------------ ys=np.atleast_1d(ys) xs=np.atleast_1d(xs) ##################################### P L O T ###############################33 if Plot3D: fig1 = plt.figure('3D',figsize=(10,10)) plt.clf() ax = fig1.add_subplot(111, projection='3d') ax.plot(H.B1, H.B2, H.D1, 'o', color="red") ax.plot(H.B1, H.B2, H.D3, 'o', color="blue") ax.set_xlabel("B1 (mm)") ax.set_ylabel("B2 (mm)") ax.set_zlabel('Difference (pxl)') ax.set_title(f'{H.filename}'+'\n'+'\n'+"Best bender setting (B1, B2) = {0:4.3f} {1:4.3f}".format(xs[0],ys[0])) ax.plot_wireframe(b1, b2, M1, rstride=1, cstride=1,color="red") ax.plot_wireframe(b1, b2, M3, rstride=1, cstride=1,color="blue") bottom=ax.get_zlim()[0] size=1 plt.xlim([X1-size*dX,X2+size*dX]) plt.ylim([Y1-size*dY,Y2+size*dY]) # Mask G1, G3 to be inside lims size=5 for i in np.arange(len(G1)): if G1[i] < Y1-size*dY: G1[i] = np.nan elif G1[i] > Y2+size*dY: G1[i] = np.nan else: pass ax.plot(x1,G1,0, color="darkred", lw=3) ax.plot(x1,G3,0, color="darkblue", lw=3) ax.plot(xs,ys,0,'o', color="black", markersize=12) outstring="B1={0:4.3f} B2={1:4.3f}".format(xs[0], ys[0]) if H.filename !='Intermediate data': figname = H.filename+"_3d.png" else: figname = 'Intermediate data - 3D' fig1.text(1.0,0.0,H.measured_date + " "+figname,fontsize=12 , va='bottom', ha='right') # fig1.text(0, 0, __file__ , transform=fig1.transFigure, color= 'grey', verticalalignment='bottom', horizontalalignment='left',rotation=0) bot =ax.get_zlim()[0] # print("bottom=",bot) # cp1= ax.contour(b1, b2, M1, 10, lw=3, cmap="autumn_r", offset=bot, linestyles='dashed' ) # cp3= ax.contour(b1, b2, M3, 10, lw=3, cmap="autumn_r", offset=bot, linestyles='solid' ) # plt.clabel(cp1, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f') # plt.clabel(cp3, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f') plt.plot(xs , ys , bot,'o', color="black", markersize=12) plt.plot(H.B1, H.B2, bot,'o', color="black", markersize=4) # ax.plot(x1,G1,bot, color="darkred") # ax.plot(x1,G3,bot, color="darkblue") if H.filename !='Intermediate data': plt.savefig(figname) if PlotContour: # fig2 = plt.figure(2,figsize=(10,8)) fig2 = plt.figure('contour',figsize=(10,10)) plt.clf() ax = plt.gca() plt.xlabel("B1 (mm)") plt.ylabel("B2 (mm)") plt.title(H.filename) cp1= plt.contour(b1, b2, M1 , linestyles='dashed' ) cp3= plt.contour(b1, b2, M3 , linestyles='solid') # proxy1 = plt.Rectangle((0, 1), 1, 3, label='foo') # proxy2 = plt.Rectangle((0, 0), 1, 1, label='bar') # plt.legend(handles=[proxy1, proxy2], labels=['diff 1', 'diff2']) plt.clabel(cp1, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f') plt.clabel(cp3, inline=True, fontsize=10, colors = 'k', fmt = '%3.2f') plt.plot(x1,G1, color="black", ls='dashed',lw=2) plt.plot(x1,G3, color="black",ls='solid',lw=2) plt.plot(xs,ys,'o', color="black", markersize=12) plt.plot(H.B1,H.B2,'o') # sys.exit() size=1 plt.xlim([X1-size*dX,X2+size*dX]) plt.ylim([Y1-size*dY,Y2+size*dY]) # plt.grid(b=True, which='both', color='0.65',linestyle='-') outstring="B1={0:4.3f} B2={1:4.3f}".format(xs[0], ys[0]) plt.text(xs[0], ys[0] , outstring , va='top' ,fontsize=10, bbox=dict(facecolor='white', alpha=0.5)) plt.text(xs[0], ys[0] , outstring , va='top' ,fontsize=10, bbox=dict(facecolor='white', alpha=0.5)) # plt.text(0, 0, __file__ , transform=fig2.transFigure, color= 'grey', verticalalignment='bottom', horizontalalignment='left',rotation=0) plt.text(0.02,0.04 , " dashed : delta1\n dotted : delta3", transform=ax.transAxes, va='top' ,fontsize=10, bbox=dict(facecolor='white', alpha=0.5)) ax.set_title(f'{H.filename}'+'\n'+'\n'+"Best bender setting (B1, B2) = {0:4.3f} {1:4.3f}".format(xs[0],ys[0])) figname =H.filename+"_contour.png" fig2.text(1.0,0.0,H.measured_date + " "+figname,fontsize=12 , va='bottom',ha='right') if H.filename !='Intermediate data': plt.savefig(figname) if show_figs: plt.show() # show plots on screen