# Signal processing functions import copy from mathutils import * # Noise evaluation #### def noise_evaluation(noise): """ Returns offset and standard deviation of the noise signal. :param noise: noise signal :return: Tuple of (noise_offset, noise_sigma)) """ return mean(noise), stdev(noise) # Processing of BLM signal #### def blm_remove_spikes(x): """ Returns new array with removed 1 and 2 point spikes. :param x: input array :return: output array """ x = copy.copy(x) if x.size > 5: # Must have enough sample points d_x = x[1:] - x[:-1] maximum = x.max() - x.min() for i in range(x.size-3): if d_x[i+1] > 0.5 * maximum and d_x[i+2] < -0.5 * maximum: # 1 point spike x[i+2] = (x[i:i+5].sum() - x[i+2])/4 if d_x[i+1] > 0.5 * maximum and d_x[i+2] >= -0.5 * maximum: # 2 point spikes x[i+2] = (x[i:i+2].sum() + x[i+4])/3 x[i+3] = (x[i+1] + x[i+4:i+6].sum())/3 # Handle edge points if d_x[-1] > 0.5 * maximum: x[-1] = x[-3:-1].sum() / 2 if d_x[-2] > 0.5 * maximum: x[-2] = (x[-1] + x[-4:-2].sum()) / 3 if d_x[0] < -0.5 * maximum: x[0] = x[1:3].sum() / 2 if d_x[1] < -0.5*maximum: x[1] = (x[0] + x[2:4].sum()) / 3 return x def blm_normalize(x, q): """ Returns normalized BLM signal using charge from the BPM :param x: BLM signal :param q: BPM charge :return: output array (same instance as input array) """ x = copy.copy(x) return x / q # Processing of position data #### def motor_to_wire_cs(pos, type: str = 'u', center_pos: float = 0.0): """ Map wire position from motor coordinates (encoder readout) to wire coordinates. center_pos should be motor position when wire is in the middle of the pipe. :param pos: position in motor coordinates :param type: type of wire 'x', 'y' or 'u' (under 45 degrees to the pipe horizon) :param center_pos: wire position in motor coordinates when in the middle of the pipe :return: position in wire coordinates """ # u: normal positions, like GARAGE, POSITION, or any wires under 45 degrees to pipe horizon # x: which crosses pipe vertically (scans in x coordinate) # y: which crosses pipe horizontally (scans in y coordinate) w_scale = {'u': 1, 'x': -math.sqrt(2).sqrt(2), 'y': math.sqrt(2).sqrt(2)} return (pos-center_pos)/w_scale[type] def remove_beam_jitter(pos, bpm1, bpm2, d_b1_w=1, d_w_b2=1): """ This is a temporary solution which works only for first WSC on SwissFEL. !!TODO: In future this method must implement correction using online model. :param pos: array of positions for 1 profile :param bpm1: array of bpm in front of wsc readings (x or y) for 1 profile :param bpm2: array of after wsc bpm readings (x or y) for 1 profile :param d_b1_w: Distance between bpm1 and wsc :param d_w_b2: Distance between wsc and bpm2 :return: array of corrected positions """ beam_position_at_wsc = (bpm1 * d_b1_w + bpm2 * d_w_b2) / (d_b1_w + d_w_b2) # Reference centroid position is mean value of all measured values beam_jitter = beam_position_at_wsc - beam_position_at_wsc.mean(0) # Correct wire position for beam jitter return pos + beam_jitter # Profile statistics #### def profile_gauss_stats(x, y, off=None, amp=None, com=None, sigma=None): if off is None: off = y.min() # good enough starting point for offset if com is None: #com = x[y.argmax()] com = x[y.index(max(y))] if amp is None: amp = y.max() - off # For normalised gauss curve sigma=1/(amp*sqrt(2*pi)) if sigma is None: surface = integrate((y-off), xdata=x) sigma = surface / (amp * math.sqrt(2 * math.pi)) try: #popt, pcov = scipy.optimize.curve_fit(gauss_fn, x, y, p0=[off, amp, com, sigma], method='lm') #popt[3] = abs(popt[3]) # sigma should be returned as positive pars = fit_gaussian(y, x, start_point = [amp, com, sigma]) popt = [0.0, pars[0], pars[1], abs([pars[2])] except Exception as e: print("Gauss fitting not successful.\n" + str(e)) popt = [off, amp, com, abs(sigma)] return popt def gauss_fn(x, a, b, c, d): return a + b * math.exp(-(math.pow((x - c), 2) / (2 * math.pow(d, 2)))) def profile_rms_stats(x, y, noise_std=0, n_sigma=3.5): """ Does the center of mass and RMS calculation on given profile. :param x: data of x axis (wire position) :param y: data of y axis (blm current) :param noise_std: standard deviation of noise :param n_sigma: multiplication of window size (n_sigma*sigma) where sigma is first estimation of sigma :return: rms of signal """ if x.shape == y.shape: # Windowing the signal # 1. All points bellow +sigma are treated as noise and ignored # 2. If point is higher than +sigma and neighbour points are noise, then it is also treated as noise and ignored # 3. All sets of multiple non noise points in a row are treated as signal_candidates signal_flag = False signal_start = 0 signal_candidates = list() for i in range(y.size): if y[i] < noise_std: # Not part of the signal. Check if this is the end of signal candidate. if signal_flag and i-signal_start > 1: signal_candidates.append((signal_start, i-1)) signal_flag = False elif not signal_flag: # Start of potential new signal candidate signal_flag = True signal_start = i # Take care of last signal candidate if last point is higher than noise (signal_flag is still True) if signal_flag and (y.size - signal_start - 1) > 1: signal_candidates.append((signal_start, y.size-1)) if len(signal_candidates) == 0: # Only noise, no signal. Cannot calculate rms print("RMS calculation not successful. No signal detected.\n") return None, None # Evaluate candidates candidates_val = [y[s[0]:s[1]].sum() for s in signal_candidates] signal_boundaries = signal_candidates[candidates_val.index(max(candidates_val))] # Calculate RMS values (at this point they are underestimated, but sigma is then used to extend the signal # region of interest which gives a better result (biggest surface) com_estimate, rms_estimate = com_rms(x[signal_boundaries[0]:signal_boundaries[1]], y[signal_boundaries[0]:signal_boundaries[1]]) # Indexes of useful signal (-1 is here because original algorithm calculates this a bit different and # because it does the floor() of calculated function # Define direction of scan if x[0] < x[-1]: # increasing position low_boundary = np.argmax(x >= com_estimate - n_sigma * rms_estimate)-1 high_boundary = np.argmax(x >= com_estimate + n_sigma * rms_estimate)-1 else: # decreasing position high_boundary = np.argmax(x <= com_estimate - n_sigma * rms_estimate)-1 low_boundary = np.argmax(x <= com_estimate + n_sigma * rms_estimate)-1 if high_boundary <= 0: high_boundary = x.size if low_boundary < 0: low_boundary = 0 if (high_boundary-low_boundary) < 4: print("RMS calculation not successful. Resolution to low.\n") return None, None # Calculate final RMS and centroid on ROI data if high_boundary == x.size: com, rms = com_rms(x[low_boundary:], y[low_boundary:]) else: com, rms = com_rms(x[low_boundary:high_boundary], y[low_boundary:high_boundary]) return com, rms else: return None, None def com_rms(x, y): # Centre of mass and rms com = sum([a*b for a,b in zip(x,y)]) / sum(y) com2 = sum([a*a*b for a,b in zip(x,y)]) / sum(y) rms = math.sqrt(math.abs(com2 - com * com)) return com, rms