import sys import os from cam_server.pipeline.data_processing import functions # Emittance measurement. Method determines the vertical electron # beam size using vertically polarized synchrotron radiation in # the visible to uv range. Images are acquired by the pi-polarization # method. import numpy as np from scipy import interpolate from scipy.signal import find_peaks import math from logging import getLogger _logger = getLogger(__name__) class PpolPeakValley(): def __init__(self): self.x, self.y = self.ppol_interpol() def ppol_interpol(self, plot_flag=False): ipv = [0.0, 32.976, 20.848, 15.306, 12.121, 10.050, 8.595, 7.518, 6.687, 6.028, 5.491, 5.047, 4.673, 4.353, 4.078, 3.837, 3.439, 3.122, 2.865, 2.652, 2.473, 2.320] sig = [0.0, 3.659, 5.175, 6.338, 7.318, 8.182, 8.963, 9.681, 10.350, 10.978, 11.572, 12.136, 12.676, 13.194, 13.692, 14.172, 15.087, 15.950, 16.769, 17.549, 18.296, 19.014] #ipv = [0.0, 43.237,26.505,19.129,14.978,12.316,10.466,9.104,8.061,7.237,6.569,6.017,5.553,5.159,4.819,4.523,4.033,3.645,3.330,3.070,2.851,2.666] #sig = [0.0, 3.659, 5.175, 6.338, 7.318, 8.182, 8.963, 9.681, 10.350, 10.978, 11.572, 12.136, 12.676, 13.194, 13.692, 14.172, 15.087, 15.950, 16.769, 17.549, 18.296, 19.014] sig2 = [element * element for element in sig] x = np.linspace(0, 999999, 999999) x = [(1+val)/1000000*max(ipv) for val in x] interpfunc = interpolate.interp1d(ipv, sig2, kind='quadratic') inter_x = interpfunc(x) sqrt_val = [math.sqrt(val) for val in inter_x] y = [0] y.extend(sqrt_val) y.append(max(sig)) xf = [0] xf.extend(x) xf.append(max(ipv)) return xf, y def get_emittance(self, ratio): emittance = 0 for i in range(0, len(self.x)): if self.x[i] > ratio: self.y[i-1] return self.y[i-1] #For peak search - delta(h) to max peak value DELTA_HEIGHT = 60000 BG_XRANGE_LOW = [0, 400] BG_XRANGE_HIGH = [1921, 2112] PEAK_SEARCH_REL_RANGE = [-2, 3] VALLEY_SEARCH_REL_RANGE = [-2, 3] ppol = PpolPeakValley() # abreviations BX1 = BG_XRANGE_LOW[0] BX2 = BG_XRANGE_LOW[1] BX3 = BG_XRANGE_HIGH[0] BX4 = BG_XRANGE_HIGH[1] plr = PEAK_SEARCH_REL_RANGE vlr = VALLEY_SEARCH_REL_RANGE ydata = [] def calculate_emittance(image, fit_pars): global ydata # array of indexes DELTA_HEIGHT = fit_pars['delta_height'] BG_XRANGE_LOW = fit_pars['bg_range_low'] BG_XRANGE_HIGH = fit_pars['bg_range_high'] PEAK_SEARCH_REL_RANGE = fit_pars['peak_search_rel_range'] VALLEY_SEARCH_REL_RANGE = fit_pars['valley_search_rel_range'] w, h = len(image[0]), len(image) if len(ydata) != h: H = [] for i in range(0, h): H.append(i) ydata = H[:int(h)] peak_array = [None] * 2 proj_peak_array = [None] * 2 peak_value = [None] * 2 peak_bg = [None] * 2 image -= image.min() projy = np.sum(image, axis=1) projx = np.sum(image, axis=0) # find peaks max_element = np.amax(projy) max_indices = np.where(projy == max_element) peaks, _ = find_peaks(projy, height=(max_element - DELTA_HEIGHT)) _logger.debug("max indices /peaks " + str(max_indices) + " " + str(peaks)) if len(peaks) != 2: mess = "Too few peaks found! " if len(peaks) < 2 else \ "Too many peaks found " _logger.debug(mess + str(peaks)) peaks_buffer = [] for val in peaks: ### COMMENTED BY ALEX # if val > 567 and val < 590: peaks_buffer.append(val) # if len(peaks_buffer) ==3: # peaks = [None] * 2 # peaks[0] = peaks_buffer[0] # peaks[1] = peaks_buffer[2] if len(peaks_buffer) != 2: return (-1.0, -2.0) else: peaks = peaks_buffer if (peaks[1] - peaks[0]) < 6: _logger.debug("Peaks are too close: " + str(peaks[1] - peaks[0])) raise Exception("Peaks are too close") # peaks =[569, 577] # Distance to minimum min_element = np.amin(projy[peaks[0]:peaks[1]]) min_indices = np.where(projy == min_element) min_idx_value = 0 # min_indices[0][0] for val in min_indices[0]: if val > peaks[0] and val < peaks[1]: min_idx_value = val break if min_idx_value == 0: raise Exception("min_idx_value == 0") for i in range(0, len(peak_array)): peak_array[i] = ydata[peaks[i] + plr[0]: peaks[i] + plr[1]] valley_array = ydata[min_idx_value + vlr[0]: min_idx_value + vlr[1]] # print("peaks", peaks, flush=True) # print(np.subtract(peaks, h)*(-1)) # print("projections peak, valley", projy[(peaks[0]-1):(peaks[1]+2)], projy[valley_array]) # print(peak_array, valley_array, flush=True) # x_bg_center = min_indices[0][0] # background bg_y1 = projy[BX1: BX2] bg_y2 = projy[BX3: BX4] bg_yS = np.concatenate((bg_y1, bg_y2)) bg_xS = list(range(BX1, BX2)) + list(range(BX3, BX4)) bg_x = [] bg_y = [] for x, y in zip(bg_xS, bg_yS): ### COMMENTED BY ALEX # if y > 800 and y < 1000: bg_x.append(x) bg_y.append(y) # print(bg_x, flush=True) # print(bg_y, flush=True) # fit poly_bg = np.polyfit(bg_x, bg_y, deg=1) array_bg = np.linspace(0, h, 10400) val_bg = np.polyval(poly_bg, array_bg) for i in range(0, len(proj_peak_array)): proj_peak_array[i] = projy[peaks[i] + plr[0]: peaks[i] + plr[1]] # proj_peak_array[1] = projy[peaks[1]-1 : peaks[1]+2] proj_valley = projy[min_idx_value + vlr[0]: min_idx_value + vlr[1]] # peaks for i in range(0, 2): poly = np.polyfit(peak_array[i], proj_peak_array[i], deg=2) idx = -poly[1] / 2 / poly[0] peak_value[i] = np.polyval(poly, idx) peak_bg[i] = val_bg[int(idx)] # valley poly2 = np.polyfit(valley_array, proj_valley, deg=2) # Only works for deg=2 minv_idx = -poly2[1] / 2 / poly2[0] poly = np.polyfit(valley_array, proj_valley, deg=4) valley_subarray = np.linspace(valley_array[0], valley_array[-1], 800) poly_array = np.polyval(poly, valley_subarray) valley_fitted_value = min(poly_array) # print("peak value", peak_value, "valley_fitted value", valley_fitted_value, flush=True) valley_bg = val_bg[int(minv_idx)] # print("valley background", valley_bg, "peak background", peak_bg[0], peak_bg[1], flush=True) ### COMMENTED BY ALEX # if valley_fitted_value < valley_bg: # valley_fitted_value = min( projy[valley_array]) # if valley_fitted_value < valley_bg: # raise Exception("valley_fitted_value < valley_bg") # #return #ratio_corrected = 2 * (valley_fitted_value - valley_bg) / ( # abs((peak_value[0] - peak_bg[0]) + (peak_value[1] - peak_bg[1]))) #idx = 0 if abs(peak_value[0] - peak_bg[0]) > abs(peak_value[1] - peak_bg[1]) else 1 #ratio_max = (valley_fitted_value - valley_bg) / abs(peak_value[idx] - peak_bg[idx]) ratio_corrected = 2 * (valley_fitted_value) / (abs(peak_value[0] + peak_value[1])) idx = 0 if abs(peak_value[0] ) > abs(peak_value[1] ) else 1 ratio_max = valley_fitted_value /abs(peak_value[idx]) emittance = ppol.get_emittance(ratio_corrected) emittance2 = ppol.get_emittance(ratio_max) _logger.debug("ratio=%f emittance %f ratio2=%f emittance2 %f" % (ratio_corrected, emittance, ratio_max, emittance2)) return (emittance, emittance2) def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None): channel_prefix = parameters["camera_name"] emittance = emittance2 = float("NaN") status = "Ok" try: ret = calculate_emittance(image, parameters['fit']) if ret is not None: emittance, emittance2 = ret except Exception as e: status = "Error: " exc_type, exc_obj, exc_tb = sys.exc_info() while exc_tb is not None: fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] status = status + fname + " line " + str(exc_tb.tb_lineno) + ": " + str(e) + " | " exc_tb = exc_tb.tb_next ret = {} ret[channel_prefix + ":emmitance"] = emittance ret[channel_prefix + ":emmitance2"] = emittance2 ret[channel_prefix + ":status"] = str(status) return ret