227 lines
8.1 KiB
Python
Executable File
227 lines
8.1 KiB
Python
Executable File
# Signal processing functions
|
|
import copy
|
|
from mathutils import *
|
|
|
|
|
|
# Noise evaluation ####
|
|
def noise_evaluation(noise):
|
|
"""
|
|
Returns offset and standard deviation of the noise signal.
|
|
:param noise: noise signal
|
|
:return: Tuple of (noise_offset, noise_sigma))
|
|
"""
|
|
|
|
return mean(noise), stdev(noise)
|
|
|
|
|
|
# Processing of BLM signal ####
|
|
def blm_remove_spikes(x):
|
|
"""
|
|
Returns new array with removed 1 and 2 point spikes.
|
|
:param x: input array
|
|
:return: output array
|
|
"""
|
|
|
|
x = copy.copy(x)
|
|
if x.size > 5: # Must have enough sample points
|
|
d_x = x[1:] - x[:-1]
|
|
maximum = x.max() - x.min()
|
|
|
|
for i in range(x.size-3):
|
|
if d_x[i+1] > 0.5 * maximum and d_x[i+2] < -0.5 * maximum:
|
|
# 1 point spike
|
|
x[i+2] = (x[i:i+5].sum() - x[i+2])/4
|
|
|
|
if d_x[i+1] > 0.5 * maximum and d_x[i+2] >= -0.5 * maximum:
|
|
# 2 point spikes
|
|
x[i+2] = (x[i:i+2].sum() + x[i+4])/3
|
|
x[i+3] = (x[i+1] + x[i+4:i+6].sum())/3
|
|
|
|
# Handle edge points
|
|
if d_x[-1] > 0.5 * maximum:
|
|
x[-1] = x[-3:-1].sum() / 2
|
|
|
|
if d_x[-2] > 0.5 * maximum:
|
|
x[-2] = (x[-1] + x[-4:-2].sum()) / 3
|
|
if d_x[0] < -0.5 * maximum:
|
|
x[0] = x[1:3].sum() / 2
|
|
if d_x[1] < -0.5*maximum:
|
|
x[1] = (x[0] + x[2:4].sum()) / 3
|
|
return x
|
|
|
|
|
|
def blm_normalize(x, q):
|
|
"""
|
|
Returns normalized BLM signal using charge from the BPM
|
|
:param x: BLM signal
|
|
:param q: BPM charge
|
|
:return: output array (same instance as input array)
|
|
"""
|
|
x = copy.copy(x)
|
|
return x / q
|
|
|
|
|
|
# Processing of position data ####
|
|
def motor_to_wire_cs(pos, type: str = 'u', center_pos: float = 0.0):
|
|
"""
|
|
Map wire position from motor coordinates (encoder readout) to wire coordinates. center_pos should be motor position
|
|
when wire is in the middle of the pipe.
|
|
|
|
:param pos: position in motor coordinates
|
|
:param type: type of wire 'x', 'y' or 'u' (under 45 degrees to the pipe horizon)
|
|
:param center_pos: wire position in motor coordinates when in the middle of the pipe
|
|
:return: position in wire coordinates
|
|
"""
|
|
|
|
# u: normal positions, like GARAGE, POSITION, or any wires under 45 degrees to pipe horizon
|
|
# x: which crosses pipe vertically (scans in x coordinate)
|
|
# y: which crosses pipe horizontally (scans in y coordinate)
|
|
w_scale = {'u': 1, 'x': -math.sqrt(2).sqrt(2), 'y': math.sqrt(2).sqrt(2)}
|
|
|
|
return (pos-center_pos)/w_scale[type]
|
|
|
|
|
|
def remove_beam_jitter(pos, bpm1, bpm2, d_b1_w=1, d_w_b2=1):
|
|
"""
|
|
This is a temporary solution which works only for first WSC on SwissFEL.
|
|
!!TODO: In future this method must implement correction using online model.
|
|
|
|
:param pos: array of positions for 1 profile
|
|
:param bpm1: array of bpm in front of wsc readings (x or y) for 1 profile
|
|
:param bpm2: array of after wsc bpm readings (x or y) for 1 profile
|
|
:param d_b1_w: Distance between bpm1 and wsc
|
|
:param d_w_b2: Distance between wsc and bpm2
|
|
:return: array of corrected positions
|
|
"""
|
|
|
|
beam_position_at_wsc = (bpm1 * d_b1_w + bpm2 * d_w_b2) / (d_b1_w + d_w_b2)
|
|
|
|
# Reference centroid position is mean value of all measured values
|
|
beam_jitter = beam_position_at_wsc - beam_position_at_wsc.mean(0)
|
|
|
|
# Correct wire position for beam jitter
|
|
return pos + beam_jitter
|
|
|
|
|
|
# Profile statistics ####
|
|
def profile_gauss_stats(x, y, off=None, amp=None, com=None, sigma=None):
|
|
if off is None:
|
|
off = y.min() # good enough starting point for offset
|
|
|
|
if com is None:
|
|
#com = x[y.argmax()]
|
|
com = x[y.index(max(y))]
|
|
|
|
if amp is None:
|
|
amp = y.max() - off
|
|
|
|
# For normalised gauss curve sigma=1/(amp*sqrt(2*pi))
|
|
if sigma is None:
|
|
surface = integrate((y-off), xdata=x)
|
|
sigma = surface / (amp * math.sqrt(2 * math.pi))
|
|
try:
|
|
#popt, pcov = scipy.optimize.curve_fit(gauss_fn, x, y, p0=[off, amp, com, sigma], method='lm')
|
|
#popt[3] = abs(popt[3]) # sigma should be returned as positive
|
|
pars = fit_gaussian(y, x, start_point = [amp, com, sigma])
|
|
popt = [0.0, pars[0], pars[1], abs([pars[2])]
|
|
|
|
|
|
except Exception as e:
|
|
print("Gauss fitting not successful.\n" + str(e))
|
|
popt = [off, amp, com, abs(sigma)]
|
|
|
|
return popt
|
|
|
|
|
|
def gauss_fn(x, a, b, c, d):
|
|
return a + b * math.exp(-(math.pow((x - c), 2) / (2 * math.pow(d, 2))))
|
|
|
|
|
|
def profile_rms_stats(x, y, noise_std=0, n_sigma=3.5):
|
|
"""
|
|
Does the center of mass and RMS calculation on given profile.
|
|
|
|
:param x: data of x axis (wire position)
|
|
:param y: data of y axis (blm current)
|
|
:param noise_std: standard deviation of noise
|
|
:param n_sigma: multiplication of window size (n_sigma*sigma) where sigma is first estimation of sigma
|
|
:return: rms of signal
|
|
"""
|
|
|
|
if x.shape == y.shape:
|
|
# Windowing the signal
|
|
# 1. All points bellow +sigma are treated as noise and ignored
|
|
# 2. If point is higher than +sigma and neighbour points are noise, then it is also treated as noise and ignored
|
|
# 3. All sets of multiple non noise points in a row are treated as signal_candidates
|
|
signal_flag = False
|
|
signal_start = 0
|
|
signal_candidates = list()
|
|
for i in range(y.size):
|
|
if y[i] < noise_std:
|
|
# Not part of the signal. Check if this is the end of signal candidate.
|
|
if signal_flag and i-signal_start > 1:
|
|
signal_candidates.append((signal_start, i-1))
|
|
signal_flag = False
|
|
|
|
elif not signal_flag:
|
|
# Start of potential new signal candidate
|
|
signal_flag = True
|
|
signal_start = i
|
|
|
|
# Take care of last signal candidate if last point is higher than noise (signal_flag is still True)
|
|
if signal_flag and (y.size - signal_start - 1) > 1:
|
|
signal_candidates.append((signal_start, y.size-1))
|
|
|
|
if len(signal_candidates) == 0:
|
|
# Only noise, no signal. Cannot calculate rms
|
|
print("RMS calculation not successful. No signal detected.\n")
|
|
return None, None
|
|
|
|
# Evaluate candidates
|
|
candidates_val = [y[s[0]:s[1]].sum() for s in signal_candidates]
|
|
signal_boundaries = signal_candidates[candidates_val.index(max(candidates_val))]
|
|
|
|
# Calculate RMS values (at this point they are underestimated, but sigma is then used to extend the signal
|
|
# region of interest which gives a better result (biggest surface)
|
|
com_estimate, rms_estimate = com_rms(x[signal_boundaries[0]:signal_boundaries[1]],
|
|
y[signal_boundaries[0]:signal_boundaries[1]])
|
|
|
|
# Indexes of useful signal (-1 is here because original algorithm calculates this a bit different and
|
|
# because it does the floor() of calculated function
|
|
|
|
# Define direction of scan
|
|
if x[0] < x[-1]: # increasing position
|
|
low_boundary = np.argmax(x >= com_estimate - n_sigma * rms_estimate)-1
|
|
high_boundary = np.argmax(x >= com_estimate + n_sigma * rms_estimate)-1
|
|
|
|
else: # decreasing position
|
|
high_boundary = np.argmax(x <= com_estimate - n_sigma * rms_estimate)-1
|
|
low_boundary = np.argmax(x <= com_estimate + n_sigma * rms_estimate)-1
|
|
|
|
if high_boundary <= 0:
|
|
high_boundary = x.size
|
|
|
|
if low_boundary < 0:
|
|
low_boundary = 0
|
|
|
|
if (high_boundary-low_boundary) < 4:
|
|
print("RMS calculation not successful. Resolution to low.\n")
|
|
return None, None
|
|
|
|
# Calculate final RMS and centroid on ROI data
|
|
if high_boundary == x.size:
|
|
com, rms = com_rms(x[low_boundary:], y[low_boundary:])
|
|
else:
|
|
com, rms = com_rms(x[low_boundary:high_boundary], y[low_boundary:high_boundary])
|
|
|
|
return com, rms
|
|
else:
|
|
return None, None
|
|
|
|
|
|
def com_rms(x, y):
|
|
# Centre of mass and rms
|
|
com = sum([a*b for a,b in zip(x,y)]) / sum(y)
|
|
com2 = sum([a*a*b for a,b in zip(x,y)]) / sum(y)
|
|
rms = math.sqrt(math.abs(com2 - com * com))
|
|
return com, rms |