import sys import json from scipy.interpolate import interp1d from scipy import signal from scipy.ndimage import gaussian_filter1d from scipy.ndimage import convolve1d import numpy as np # Alvra spectral encoder constants/waveforms px2fs = 1.91 # calibration from ... lambdas = 467.55 + 0.07219*np.arange(0,2048) # calibration from 23-9-2020 nus = 299792458 / (lambdas * 10**-9) # frequency space, uneven nus_new = np.linspace(nus[0], nus[-1], num=2048, endpoint=True) # frequency space, even filters = { 'YAG': np.concatenate((np.ones(50),signal.tukey(40)[20:40], np.zeros(1978), np.zeros(2048))), # fourier filter for YAGS 'SiN': np.concatenate((signal.tukey(40)[20:40], np.zeros(2028), np.zeros(2048))), # fourier filter for 5um SiN 'SiN2': np.concatenate((signal.tukey(32)[16:32], np.zeros(2032), np.zeros(2048))), # fourier filter for 2um SiN 'babyYAG': np.concatenate((signal.tukey(40)[20:40], np.zeros(2028), np.zeros(2048))), # baby timetool YAG filter 'babyYAG2': np.concatenate((np.ones(50),signal.tukey(40)[20:40], np.zeros(1978), np.zeros(2048))) # baby timetool YAG } # Functions for image analysis and spectral encoding processing def get_roi_projection(image, roi, axis): x_start, x_stop, y_start, y_stop = roi cropped = image[x_start:x_stop, y_start:y_stop] project = cropped.mean(axis=axis) return project def interpolate(xold, xnew, vals): ''' Interpolate vals from xold to xnew ''' interp = interp1d(xold, vals, kind='cubic') return interp(xnew) def fourier_filter(vals, filt): ''' Fourier transform, filter, inverse fourier transform, take the real part ''' vals = np.hstack((vals, np.zeros_like(vals))) # pad transformed = np.fft.fft(vals) filtered = transformed * filt inverse = np.fft.ifft(filtered) invreal = 2 * np.real(inverse) return invreal def edge(filter_name, backgrounds, signals, background_from_fit, peakback): ''' returns: edge positions determined from argmax of peak traces, converted to fs edge amplitudes determined from the amax of the peak traces the actual peak traces, which are the derivative of signal traces (below) the raw signal traces, without any processing or smoothing except for the Fourier filter applied to remove the etalon ''' ffilter = filters[filter_name] # background normalization sig_norm = np.nan_to_num(signals / backgrounds) / background_from_fit # interpolate to get evenly sampled in frequency space sig_interp = interpolate(nus, nus_new, sig_norm) # Fourier filter sig_filtered = fourier_filter(sig_interp, ffilter) # interpolate to get unevenly sampled in frequency space (back to original wavelength space) sig_uninterp = interpolate(nus_new, nus, sig_filtered[..., 0:2048]) # peak via the derivative sig_deriv = gaussian_filter1d(sig_uninterp, 50, order=1) sig_deriv -= peakback peak_pos = 1024 - np.argmax(sig_deriv, axis=-1) peak_pos *= px2fs peak_amp = np.amax(sig_deriv, axis=-1) return peak_pos, peak_amp, sig_deriv, sig_uninterp def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None): ''' takes an image, processes the signal, sends out the raw data and processed results ''' image = image.astype(int) camera_name = parameters["camera_name"] background = parameters.get("background_data") background_name = parameters.get("image_background") background_mode = parameters.get("image_background_enable") roi_signal = parameters.get("roi_signal") roi_background = parameters.get("roi_background") project_axis = parameters.get("project_axis", 0) threshold = parameters.get("threshold") # maintain the structure of processing_parameters background_shape = None # maintain the structure of res projection_signal = projection_background = None try: if background is not None: # background_shape = background.shape # image -= background projection_background = get_roi_projection(background, roi_signal, project_axis) # # if threshold is not None: # image -= threshold # image[image < 0] = 0 if roi_signal is not None: projection_signal = get_roi_projection(image, roi_signal, project_axis) # # if roi_background is not None: # projection_background = get_roi_projection(image, roi_background, project_axis) peak_pos, peak_amp, sig_deriv, sig_uninterp = edge("YAG", projection_background, projection_signal, 1, 0) except Exception as e: lineno = sys.exc_info()[2].tb_lineno tn = type(e).__name__ status = f"Error in line number {lineno}: {tn}: {e}" raise e else: status = "OK" processing_parameters = { "image_shape": image.shape, "background_shape": background_shape, "background_name": background_name, "background_mode": background_mode, "roi_signal": roi_signal, "roi_background": roi_background, "project_axis": project_axis, "threshold": threshold, "status": status } processing_parameters = json.dumps(processing_parameters) res = { camera_name + ".processing_parameters": processing_parameters, camera_name + ".projection_signal": projection_signal, camera_name + ".projection_background": projection_background, camera_name + ".edge_position": peak_pos, camera_name + ".edge_amplitude": peak_amp, camera_name + ".edge_derivative": sig_deriv, camera_name + ".edge_raw": sig_uninterp } return res