Files
dev/script/sig_process.py
2018-04-17 12:05:48 +02:00

221 lines
8.0 KiB
Python
Executable File

# Signal processing functions
import copy
import numpy as np
import scipy.optimize
# Noise evaluation ####
def noise_evaluation(noise: np.ndarray):
"""
Returns offset and standard deviation of the noise signal.
:param noise: noise signal
:return: Tuple of (noise_offset, noise_sigma))
"""
return np.mean(noise), np.std(noise)
# Processing of BLM signal ####
def blm_remove_spikes(x):
"""
Returns new array with removed 1 and 2 point spikes.
:param x: input array
:return: output array
"""
x = copy.copy(x)
if x.size > 5: # Must have enough sample points
d_x = x[1:] - x[:-1]
maximum = x.max() - x.min()
for i in range(x.size-3):
if d_x[i+1] > 0.5 * maximum and d_x[i+2] < -0.5 * maximum:
# 1 point spike
x[i+2] = (x[i:i+5].sum() - x[i+2])/4
if d_x[i+1] > 0.5 * maximum and d_x[i+2] >= -0.5 * maximum:
# 2 point spikes
if i < x.size-4: #TODO: FIX BY AG, CHECK
x[i+2] = (x[i:i+2].sum() + x[i+4])/3
if i < x.size-6: #TODO: FIX BY AG, CHECK
x[i+3] = (x[i+1] + x[i+4:i+6].sum())/3
# Handle edge points
if d_x[-1] > 0.5 * maximum:
x[-1] = x[-3:-1].sum() / 2
if d_x[-2] > 0.5 * maximum:
x[-2] = (x[-1] + x[-4:-2].sum()) / 3
if d_x[0] < -0.5 * maximum:
x[0] = x[1:3].sum() / 2
if d_x[1] < -0.5*maximum:
x[1] = (x[0] + x[2:4].sum()) / 3
return x
def blm_normalize(x, q):
"""
Returns normalized BLM signal using charge from the BPM
:param x: BLM signal
:param q: BPM charge
:return: output array (same instance as input array)
"""
x = copy.copy(x)
return x / q
# Processing of position data ####
def motor_to_wire_cs(pos, type: str = 'u', center_pos: float = 0.0):
"""
Map wire position from motor coordinates (encoder readout) to wire coordinates. center_pos should be motor position
when wire is in the middle of the pipe.
:param pos: position in motor coordinates
:param type: type of wire 'x', 'y' or 'u' (under 45 degrees to the pipe horizon)
:param center_pos: wire position in motor coordinates when in the middle of the pipe
:return: position in wire coordinates
"""
# u: normal positions, like GARAGE, POSITION, or any wires under 45 degrees to pipe horizon
# x: which crosses pipe vertically (scans in x coordinate)
# y: which crosses pipe horizontally (scans in y coordinate)
w_scale = {'u': 1, 'x': -np.sqrt(2), 'y': np.sqrt(2)}
return (pos-center_pos)/w_scale[type]
def remove_beam_jitter(pos: np.ndarray, bpm1: np.ndarray, bpm2: np.ndarray, d_b1_w=1, d_w_b2=1):
"""
This is a temporary solution which works only for first WSC on SwissFEL.
!!TODO: In future this method must implement correction using online model.
:param pos: array of positions for 1 profile
:param bpm1: array of bpm in front of wsc readings (x or y) for 1 profile
:param bpm2: array of after wsc bpm readings (x or y) for 1 profile
:param d_b1_w: Distance between bpm1 and wsc
:param d_w_b2: Distance between wsc and bpm2
:return: array of corrected positions
"""
beam_position_at_wsc = (bpm1 * d_b1_w + bpm2 * d_w_b2) / (d_b1_w + d_w_b2)
# Reference centroid position is mean value of all measured values
beam_jitter = beam_position_at_wsc - beam_position_at_wsc.mean(0)
# Correct wire position for beam jitter
return pos + beam_jitter
# Profile statistics ####
def profile_gauss_stats(x, y, off=None, amp=None, com=None, sigma=None):
if off is None:
off = y.min() # good enough starting point for offset
if com is None:
com = x[y.argmax()]
if amp is None:
amp = y.max() - off
# For normalised gauss curve sigma=1/(amp*sqrt(2*pi))
if sigma is None:
surface = np.trapz((y-off), x=x)
sigma = surface / (amp * np.sqrt(2 * np.pi))
try:
popt, pcov = scipy.optimize.curve_fit(gauss_fn, x, y, p0=[off, amp, com, sigma], method='lm')
popt[3] = abs(popt[3]) # sigma should be returned as positive
except Exception as e:
print("Gauss fitting not successful.\n" + str(e))
popt = [off, amp, com, abs(sigma)]
return popt
def gauss_fn(x, a, b, c, d):
return a + b * np.exp(-(np.power((x - c), 2) / (2 * np.power(d, 2))))
def profile_rms_stats(x, y, noise_std=0, n_sigma=3.5):
"""
Does the center of mass and RMS calculation on given profile.
:param x: data of x axis (wire position)
:param y: data of y axis (blm current)
:param noise_std: standard deviation of noise
:param n_sigma: multiplication of window size (n_sigma*sigma) where sigma is first estimation of sigma
:return: rms of signal
"""
if x.shape == y.shape:
# Windowing the signal
# 1. All points bellow +sigma are treated as noise and ignored
# 2. If point is higher than +sigma and neighbour points are noise, then it is also treated as noise and ignored
# 3. All sets of multiple non noise points in a row are treated as signal_candidates
signal_flag = False
signal_start = 0
signal_candidates = list()
for i in range(y.size):
if y[i] < noise_std:
# Not part of the signal. Check if this is the end of signal candidate.
if signal_flag and i-signal_start > 1:
signal_candidates.append((signal_start, i-1))
signal_flag = False
elif not signal_flag:
# Start of potential new signal candidate
signal_flag = True
signal_start = i
# Take care of last signal candidate if last point is higher than noise (signal_flag is still True)
if signal_flag and (y.size - signal_start - 1) > 1:
signal_candidates.append((signal_start, y.size-1))
if len(signal_candidates) == 0:
# Only noise, no signal. Cannot calculate rms
print("RMS calculation not successful. No signal detected.\n")
return None, None
# Evaluate candidates
candidates_val = [y[s[0]:s[1]].sum() for s in signal_candidates]
signal_boundaries = signal_candidates[candidates_val.index(max(candidates_val))]
# Calculate RMS values (at this point they are underestimated, but sigma is then used to extend the signal
# region of interest which gives a better result (biggest surface)
com_estimate, rms_estimate = com_rms(x[signal_boundaries[0]:signal_boundaries[1]],
y[signal_boundaries[0]:signal_boundaries[1]])
# Indexes of useful signal (-1 is here because original algorithm calculates this a bit different and
# because it does the floor() of calculated function
# Define direction of scan
if x[0] < x[-1]: # increasing position
low_boundary = np.argmax(x >= com_estimate - n_sigma * rms_estimate)-1
high_boundary = np.argmax(x >= com_estimate + n_sigma * rms_estimate)-1
else: # decreasing position
high_boundary = np.argmax(x <= com_estimate - n_sigma * rms_estimate)-1
low_boundary = np.argmax(x <= com_estimate + n_sigma * rms_estimate)-1
if high_boundary <= 0:
high_boundary = x.size
if low_boundary < 0:
low_boundary = 0
if (high_boundary-low_boundary) < 4:
print("RMS calculation not successful. Resolution to low.\n")
return None, None
# Calculate final RMS and centroid on ROI data
if high_boundary == x.size:
com, rms = com_rms(x[low_boundary:], y[low_boundary:])
else:
com, rms = com_rms(x[low_boundary:high_boundary], y[low_boundary:high_boundary])
return com, rms
else:
return None, None
def com_rms(x, y):
# Centre of mass and rms
com = (x * y).sum() / y.sum()
com2 = (x * x * y).sum() / y.sum()
rms = np.sqrt(np.abs(com2 - com * com))
return com, rms