from collections import deque from logging import getLogger import numpy as np from scipy import signal _logger = getLogger(__name__) buffer_dark = deque() buffer_savgol = deque() initialized = False def initialize(params): global buffer_dark, buffer_savgol, initialized buffer_dark = deque(maxlen=params["buffer_length"]) buffer_savgol = deque(maxlen=params["buffer_length"]) initialized = True def find_edge(data, step_length=50, edge_type="falling"): # prepare a step function and refine it step_waveform = np.ones(shape=(step_length,)) if edge_type == "rising": step_waveform[: int(step_length / 2)] = -1 elif edge_type == "falling": step_waveform[int(step_length / 2) :] = -1 # find edges xcorr = signal.correlate(data, step_waveform, mode="valid") edge_position = np.argmax(xcorr) xcorr_amplitude = np.amax(xcorr) # correct edge_position for step_length edge_position += np.floor(step_length / 2) return { "edge_pos": edge_position, "xcorr": xcorr, "xcorr_ampl": xcorr_amplitude, "signal": data, } def process(data, pulse_id, timestamp, params): device = params["device"] step_length = params["step_length"] edge_type = params["edge_type"] dark_event = params["dark_event"] fel_on_event = params["fel_on_event"] calib = params["calib"] filter_window = params["filter_window"] # prof_sig = data[params["prof_sig"]] events = data[params["events"]] if not initialized: initialize(params) output = {} output[f"{device}:raw_wf"] = events return output