from collections import deque from logging import getLogger import numpy as np from scipy import signal _logger = getLogger(__name__) buffer_dark = deque() buffer_savgol = deque() initialized = False def initialize(params): global buffer_dark, buffer_savgol, initialized buffer_dark = deque(maxlen=params["buffer_length"]) buffer_savgol = deque(maxlen=params["buffer_length"]) initialized = True def find_edge(data, step_length=50, edge_type="falling"): # prepare a step function and refine it step_waveform = np.ones(shape=(step_length,)) if edge_type == "rising": step_waveform[: int(step_length / 2)] = -1 elif edge_type == "falling": step_waveform[int(step_length / 2) :] = -1 # find edges xcorr = signal.correlate(data, step_waveform, mode="valid") edge_position = np.argmax(xcorr) xcorr_amplitude = np.amax(xcorr) # correct edge_position for step_length edge_position += np.floor(step_length / 2) return { "edge_pos": edge_position, "xcorr": xcorr, "xcorr_ampl": xcorr_amplitude, "signal": data, } def process(data, pulse_id, timestamp, params): device = params["device"] step_length = params["step_length"] edge_type = params["edge_type"] dark_event = params["dark_event"] fel_on_event = params["fel_on_event"] calib = params["calib"] filter_window = params["filter_window"] prof_sig = data[params["prof_sig"]] events = data[params["events"]] if not initialized: initialize(params) prof_sig_savgol = signal.savgol_filter(prof_sig, filter_window, 3) if events[dark_event]: buffer_dark.append(prof_sig) buffer_savgol.append(prof_sig_savgol) if buffer_savgol: prof_sig_norm = prof_sig_savgol / np.mean(np.array(buffer_savgol), axis=0) else: prof_sig_norm = prof_sig_savgol if events[fel_on_event] and not events[dark_event]: edge_results = find_edge(prof_sig_norm, step_length, edge_type) edge_results["arrival_time"] = np.polyval(calib, edge_results["edge_pos"]) else: edge_results = { "edge_pos": None, "xcorr": None, "xcorr_ampl": None, "signal": prof_sig_norm, } edge_results["arrival_time"] = None # Set bs outputs output = {} for key, value in edge_results.items(): output[f"{device}:{key}"] = value output[f"{device}:raw_wf"] = prof_sig output[f"{device}:raw_wf_savgol"] = prof_sig_savgol # if events[dark_event]: # output[f"{device}:dark_wf"] = prof_sig # output[f"{device}:dark_wf_savgol"] = prof_sig_savgol # else: # output[f"{device}:dark_wf"] = None # output[f"{device}:dark_wf_savgol"] = None if buffer_dark: output[f"{device}:avg_dark_wf"] = np.mean(buffer_dark, axis=0) else: #output[f"{device}:avg_dark_wf"] = np.zeros_like(prof_sig) #Changed By Gobbo to avoid type errors output[f"{device}:avg_dark_wf"] = None # np.zeros_like(prof_sig) # if buffer_savgol: # output[f"{device}:avg_dark_wf_savgol"] = np.mean(buffer_savgol, axis=0) # else: # output[f"{device}:avg_dark_wf_savgol"] = None return output