from collections import deque from logging import getLogger import numpy as np from scipy import signal _logger = getLogger(__name__) buffer_dark = deque() buffer_savgol = deque() initialized = False def initialize(params): global buffer_dark, buffer_savgol, initialized buffer_dark = deque(maxlen=params["buffer_length"]) buffer_savgol = deque(maxlen=params["buffer_length"]) initialized = True def find_edge(data, step_length=50, edge_type="falling", roi=None): """ Find an edge in the given data using cross-correlation with a step waveform. If a region-of-interest (roi) is provided as [start, end], the search is limited to that slice of the data and the resulting edge position is offset appropriately. """ # If ROI is provided, slice the data accordingly. if roi is not None: data_roi = data[roi[0]:roi[1]] else: data_roi = data # Prepare the step function. step_waveform = np.ones(shape=(step_length,)) if edge_type == "rising": step_waveform[: int(step_length / 2)] = -1 elif edge_type == "falling": step_waveform[int(step_length / 2):] = -1 # Perform cross-correlation on the (possibly sliced) data. xcorr = signal.correlate(data_roi, step_waveform, mode="valid") edge_position = np.argmax(xcorr) xcorr_amplitude = np.amax(xcorr) # Correct edge_position for step_length. edge_position += np.floor(step_length / 2) # If ROI is provided, add the offset to get the position in the full signal. if roi is not None: edge_position += roi[0] return { "edge_pos": edge_position, "xcorr": xcorr, "xcorr_ampl": xcorr_amplitude, "signal": data, } def process(data, pulse_id, timestamp, params): device = params["device"] step_length = params["step_length"] edge_type = params["edge_type"] dark_event = params["dark_event"] fel_on_event = params["fel_on_event"] calib = params["calib"] filter_window = params["filter_window"] prof_sig = data[params["prof_sig"]] events = data[params["events"]] if not initialized: initialize(params) prof_sig_savgol = signal.savgol_filter(prof_sig, filter_window, 3) if events[dark_event]: buffer_dark.append(prof_sig) buffer_savgol.append(prof_sig_savgol) if buffer_savgol: prof_sig_norm = prof_sig_savgol / np.mean(np.array(buffer_savgol), axis=0) else: prof_sig_norm = prof_sig_savgol if events[fel_on_event] and not events[dark_event]: # Limit the edge search to the region-of-interest provided in params. edge_results = find_edge(prof_sig_norm, step_length, edge_type, roi=params.get("roi")) edge_results["arrival_time"] = np.polyval(calib, edge_results["edge_pos"]) else: edge_results = { "edge_pos": None, "xcorr": None, "xcorr_ampl": None, "signal": prof_sig_norm, } edge_results["arrival_time"] = None # Set beam-synchronization outputs. output = {} for key, value in edge_results.items(): output[f"{device}:{key}"] = value output[f"{device}:raw_wf"] = prof_sig output[f"{device}:raw_wf_savgol"] = prof_sig_savgol if buffer_dark: output[f"{device}:avg_dark_wf"] = np.mean(buffer_dark, axis=0) else: output[f"{device}:avg_dark_wf"] = None return output