Files
camserver_sf/configuration/user_scripts/SATOP31-ATT01_Debug_proc.py
2025-04-28 14:35:09 +02:00

115 lines
3.4 KiB
Python

from collections import deque
from logging import getLogger
import numpy as np
from scipy import signal
_logger = getLogger(__name__)
buffer_dark = deque()
buffer_savgol = deque()
initialized = False
def initialize(params):
global buffer_dark, buffer_savgol, initialized
buffer_dark = deque(maxlen=params["buffer_length"])
buffer_savgol = deque(maxlen=params["buffer_length"])
initialized = True
def find_edge(data, step_length=50, edge_type="falling", roi=None):
"""
Find an edge in the given data using cross-correlation with a step waveform.
If a region-of-interest (roi) is provided as [start, end], the search is limited
to that slice of the data and the resulting edge position is offset appropriately.
"""
# If ROI is provided, slice the data accordingly.
if roi is not None:
data_roi = data[roi[0]:roi[1]]
else:
data_roi = data
# Prepare the step function.
step_waveform = np.ones(shape=(step_length,))
if edge_type == "rising":
step_waveform[: int(step_length / 2)] = -1
elif edge_type == "falling":
step_waveform[int(step_length / 2):] = -1
# Perform cross-correlation on the (possibly sliced) data.
xcorr = signal.correlate(data_roi, step_waveform, mode="valid")
edge_position = np.argmax(xcorr)
xcorr_amplitude = np.amax(xcorr)
# Correct edge_position for step_length.
edge_position += np.floor(step_length / 2)
# If ROI is provided, add the offset to get the position in the full signal.
if roi is not None:
edge_position += roi[0]
return {
"edge_pos": edge_position,
"xcorr": xcorr,
"xcorr_ampl": xcorr_amplitude,
"signal": data,
}
def process(data, pulse_id, timestamp, params):
device = params["device"]
step_length = params["step_length"]
edge_type = params["edge_type"]
dark_event = params["dark_event"]
fel_on_event = params["fel_on_event"]
calib = params["calib"]
filter_window = params["filter_window"]
prof_sig = data[params["prof_sig"]]
events = data[params["events"]]
if not initialized:
initialize(params)
prof_sig_savgol = signal.savgol_filter(prof_sig, filter_window, 3)
if events[dark_event]:
buffer_dark.append(prof_sig)
buffer_savgol.append(prof_sig_savgol)
if buffer_savgol:
prof_sig_norm = prof_sig_savgol / np.mean(np.array(buffer_savgol), axis=0)
else:
prof_sig_norm = prof_sig_savgol
if events[fel_on_event] and not events[dark_event]:
# Limit the edge search to the region-of-interest provided in params.
edge_results = find_edge(prof_sig_norm, step_length, edge_type, roi=params.get("roi"))
edge_results["arrival_time"] = np.polyval(calib, edge_results["edge_pos"])
else:
edge_results = {
"edge_pos": None,
"xcorr": None,
"xcorr_ampl": None,
"signal": prof_sig_norm,
}
edge_results["arrival_time"] = None
# Set beam-synchronization outputs.
output = {}
for key, value in edge_results.items():
output[f"{device}:{key}"] = value
output[f"{device}:raw_wf"] = prof_sig
output[f"{device}:raw_wf_savgol"] = prof_sig_savgol
if buffer_dark:
output[f"{device}:avg_dark_wf"] = np.mean(buffer_dark, axis=0)
else:
output[f"{device}:avg_dark_wf"] = None
return output