65 lines
1.6 KiB
Python
65 lines
1.6 KiB
Python
from collections import deque
|
|
from logging import getLogger
|
|
|
|
import numpy as np
|
|
from scipy import signal
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
buffer_dark = deque()
|
|
buffer_savgol = deque()
|
|
initialized = False
|
|
|
|
|
|
def initialize(params):
|
|
global buffer_dark, buffer_savgol, initialized
|
|
|
|
buffer_dark = deque(maxlen=params["buffer_length"])
|
|
buffer_savgol = deque(maxlen=params["buffer_length"])
|
|
|
|
initialized = True
|
|
|
|
|
|
def find_edge(data, step_length=50, edge_type="falling"):
|
|
# prepare a step function and refine it
|
|
step_waveform = np.ones(shape=(step_length,))
|
|
if edge_type == "rising":
|
|
step_waveform[: int(step_length / 2)] = -1
|
|
elif edge_type == "falling":
|
|
step_waveform[int(step_length / 2) :] = -1
|
|
|
|
# find edges
|
|
xcorr = signal.correlate(data, step_waveform, mode="valid")
|
|
edge_position = np.argmax(xcorr)
|
|
xcorr_amplitude = np.amax(xcorr)
|
|
|
|
# correct edge_position for step_length
|
|
edge_position += np.floor(step_length / 2)
|
|
|
|
return {
|
|
"edge_pos": edge_position,
|
|
"xcorr": xcorr,
|
|
"xcorr_ampl": xcorr_amplitude,
|
|
"signal": data,
|
|
}
|
|
|
|
|
|
def process(data, pulse_id, timestamp, params):
|
|
device = params["device"]
|
|
step_length = params["step_length"]
|
|
edge_type = params["edge_type"]
|
|
dark_event = params["dark_event"]
|
|
fel_on_event = params["fel_on_event"]
|
|
calib = params["calib"]
|
|
filter_window = params["filter_window"]
|
|
|
|
# prof_sig = data[params["prof_sig"]]
|
|
events = data[params["events"]]
|
|
|
|
if not initialized:
|
|
initialize(params)
|
|
output = {}
|
|
|
|
output[f"{device}:raw_wf"] = events
|
|
|
|
return output |