Files
camserver_sf/configuration/user_scripts/swissfel_spectral_processing_fast.py
2025-08-12 11:36:07 +02:00

306 lines
9.8 KiB
Python

from logging import getLogger
from cam_server.pipeline.data_processing import functions
from cam_server.utils import epics_lock, create_thread_pvs
from collections import deque
import threading
import json
import numpy as np
import scipy.signal
import numba
import time
from threading import Thread
# Configure Numba to use multiple threads and parallelize
numba.set_num_threads(4)
_logger = getLogger(__name__)
# Shared state globals
global_roi = [0, 0]
last_roi = None
params_json = None
initialized = False
sent_pid = -1
buffer = deque(maxlen=5)
channel_pv_names = []
base_pv_names = []
all_pv_names = []
global_ravg_length = 100
# Running-average state
global ravg_buffers, ravg_sum_map, ravg_lock
ravg_buffers = {}
ravg_sum_map = {}
ravg_lock = threading.Lock()
# N-shot average state
global global_avg_n, avg_buffer, avg_sum
global_avg_n = 100
avg_buffer = deque()
avg_sum = None
# Cached PV handles
pv_handles = None
# Update thread function
def update_PVs():
"""Continuously pop and write to EPICS PVs using cached handles."""
global pv_handles, buffer
while True:
try:
entry = buffer.popleft()
except IndexError:
time.sleep(0.001)
continue
for pv, val in zip(pv_handles, entry):
if pv and pv.connected and val is not None:
pv.put(val)
def initialize(params):
"""Initialize PV names, caches, and start update thread."""
global channel_pv_names, base_pv_names, all_pv_names
global global_ravg_length, global_avg_n, avg_buffer, avg_sum, pv_handles
camera = params["camera_name"]
e_int = params["e_int_name"]
e_axis = params["e_axis_name"]
# Fit/result PV names
center_pv = f"{camera}:FIT-COM"
fwhm_pv = f"{camera}:FIT-FWHM"
fit_rms_pv = f"{camera}:FIT-RMS"
fit_res_pv = f"{camera}:FIT-RES"
fit_spec_pv = f"{camera}:FIT-SPECTRUM_Y"
# ROI PVs
ymin_pv = f"{camera}:SPC_ROI_YMIN"
ymax_pv = f"{camera}:SPC_ROI_YMAX"
channel_pv_names = [ymin_pv, ymax_pv, e_axis]
# Stats PVs
com_pv = f"{camera}:SPECT-COM"
std_pv = f"{camera}:SPECT-RMS"
skew_pv = f"{camera}:SPECT-SKEW"
iqr_pv = f"{camera}:SPECT-IQR"
res_pv = f"{camera}:SPECT-RES"
# Base PV list
base_pv_names = [
e_int, center_pv, fwhm_pv, fit_rms_pv,
fit_res_pv, fit_spec_pv, com_pv, std_pv, skew_pv, iqr_pv, res_pv
]
# Running-average PV names
global_ravg_length = params.get('RAVG_length', global_ravg_length)
exclude = {e_int, e_axis, f"{camera}:processing_parameters"}
ravg_base = [pv for pv in base_pv_names if pv not in exclude]
ravg_pv_names = [pv + '-RAVG' for pv in ravg_base]
# N-shot average settings
global_avg_n = params.get('avg_nshots', global_avg_n)
avg_buffer = deque(maxlen=global_avg_n)
avg_sum = None
avg_pv_names = [
f"{camera}:AVG-FIT-COM",
f"{camera}:AVG-FIT-FWHM",
f"{camera}:AVG-FIT-RMS",
f"{camera}:AVG-FIT-RES",
f"{camera}:AVG-SPECT-COM",
f"{camera}:AVG-SPECT-RMS",
f"{camera}:AVG-SPECT-SKEW",
f"{camera}:AVG-SPECT-IQR",
f"{camera}:AVG-SPECT-RES",
f"{camera}:AVG-SPECTRUM_Y",
f"{camera}:AVG-FIT-SPECTRUM_Y"
]
# All PVs
all_pv_names = base_pv_names + ravg_pv_names + avg_pv_names + [f"{camera}:processing_parameters"]
# Cache PV handles
pv_handles = create_thread_pvs(all_pv_names)
# Start update thread
thread = Thread(target=update_PVs, daemon=True)
thread.start()
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None):
"""
Fast processing: vectorized spectrum, incremental averages, cached PVs.
"""
global initialized, sent_pid, buffer
global global_roi, last_roi, params_json
global ravg_buffers, ravg_sum_map, ravg_lock
global avg_buffer, avg_sum, global_avg_n, all_pv_names
if not initialized:
initialize(parameters)
initialized = True
camera = parameters["camera_name"]
# Read ROI once and detect change
ymin, ymax = global_roi
new_ymin = parameters.get('roi_ymin', ymin)
new_ymax = parameters.get('roi_ymax', ymax)
if (new_ymin, new_ymax) != (ymin, ymax):
global_roi[:] = [new_ymin, new_ymax]
roi_changed = True
else:
roi_changed = False
# Preprocess image
img = image.astype(np.float32)
pixel_bkg = parameters.get("pixel_bkg", 0)
proc_img = img - pixel_bkg
nrows, _ = proc_img.shape
# Background subtraction
bg = parameters.pop('background_data', None)
if isinstance(bg, np.ndarray) and bg.shape == proc_img.shape:
proc_img -= bg.astype(np.float32)
# Crop ROI
ymin_i, ymax_i = int(global_roi[0]), int(global_roi[1])
if 0 <= ymin_i < ymax_i <= nrows:
proc_img = proc_img[ymin_i:ymax_i, :]
# Trim x_axis to match the cropped image width
axis = x_axis[:proc_img.shape[1]]
# Spectrum via vector sum
spectrum = np.sum(proc_img, axis=0)
# Smooth
smoothed = scipy.signal.savgol_filter(spectrum, 51, 3)
# Fit Gaussian
minimum, maximum = smoothed.min(), smoothed.max()
amplitude = maximum - minimum
skip = amplitude <= nrows * 1.5
offset, amp_fit, center, sigma = functions.gauss_fit_psss(
smoothed[::2],
axis[:len(smoothed)][::2],
offset=minimum,
amplitude=amplitude,
skip=skip,
maxfev=10
)
fit_spectrum = offset + amp_fit * np.exp(
-((axis[:len(smoothed)] - center)**2) / (2 * sigma**2)
)
# Compute stats
sm_norm = smoothed / np.sum(smoothed)
spect_com = np.dot(axis[:len(sm_norm)], sm_norm)
spect_std = np.sqrt(np.dot((axis[:len(sm_norm)] - spect_com)**2, sm_norm))
spect_skew = np.dot((axis[:len(sm_norm)] - spect_com)**3, sm_norm) / (spect_std**3)
cum = np.cumsum(sm_norm)
e25 = np.interp(0.25, cum, axis[:len(cum)])
e75 = np.interp(0.75, cum, axis[:len(cum)])
spect_iqr = e75 - e25
spect_sum = spectrum.sum()
# Original result
result = {
parameters["e_int_name"]: spectrum,
parameters["e_axis_name"]: axis,
f"{camera}:SPECTRUM_Y_SUM": spect_sum,
f"{camera}:FIT-COM": np.float64(center),
f"{camera}:FIT-FWHM": np.float64(2.355 * sigma),
f"{camera}:FIT-RMS": np.float64(sigma),
f"{camera}:FIT-RES": np.float64(2.355 * sigma / center * 1000),
f"{camera}:FIT-SPECTRUM_Y": fit_spectrum,
f"{camera}:SPECT-COM": spect_com,
f"{camera}:SPECT-RMS": spect_std,
f"{camera}:SPECT-SKEW": spect_skew,
f"{camera}:SPECT-IQR": spect_iqr,
f"{camera}:SPECT-RES": np.float64(spect_iqr / spect_com * 1000)
}
# Update JSON PV only on ROI change
if roi_changed:
params_json = json.dumps({"roi": global_roi})
result[f"{camera}:processing_parameters"] = params_json
# Running averages (thread-safe, incremental sum)
ravg_results = {}
exclude = {
parameters["e_int_name"],
parameters["e_axis_name"],
f"{camera}:processing_parameters"
}
with ravg_lock:
for pv in base_pv_names:
if pv not in exclude:
buf = ravg_buffers.setdefault(pv, deque(maxlen=global_ravg_length))
sum_val = ravg_sum_map.get(pv, 0.0)
if len(buf) == buf.maxlen:
old = buf.popleft()
sum_val -= old
buf.append(result[pv])
sum_val += result[pv]
ravg_sum_map[pv] = sum_val
ravg_results[pv + "-RAVG"] = sum_val / len(buf)
# N-shot average (incremental, no full-stack)
global avg_sum
if avg_sum is None:
avg_sum = np.zeros_like(spectrum)
if len(avg_buffer) == global_avg_n:
old = avg_buffer.popleft()
avg_sum -= old
avg_buffer.append(spectrum)
avg_sum += spectrum
avg_spectrum = avg_sum / len(avg_buffer)
# Fit and stats on avg_spectrum
sm_avg = scipy.signal.savgol_filter(avg_spectrum, 51, 3)
min_a, max_a = sm_avg.min(), sm_avg.max()
amp_a = max_a - min_a
skip_a = amp_a <= nrows * 1.5
offs_a, amp_fit_a, center_a, sigma_a = functions.gauss_fit_psss(
sm_avg[::2],
axis[:len(sm_avg)][::2],
offset=min_a,
amplitude=amp_a,
skip=skip_a,
maxfev=10
)
fit_avg_spectrum = offs_a + amp_fit_a * np.exp(
-((axis[:len(sm_avg)] - center_a)**2) / (2 * sigma_a**2)
)
sm_norm_a = sm_avg / np.sum(sm_avg)
spect_com_a = np.dot(axis[:len(sm_norm_a)], sm_norm_a)
spect_std_a = np.sqrt(np.dot((axis[:len(sm_norm_a)] - spect_com_a)**2, sm_norm_a))
spect_skew_a = np.dot((axis[:len(sm_norm_a)] - spect_com_a)**3, sm_norm_a) / (spect_std_a**3)
cum_a = np.cumsum(sm_norm_a)
e25_a = np.interp(0.25, cum_a, axis[:len(cum_a)])
e75_a = np.interp(0.75, cum_a, axis[:len(cum_a)])
spect_iqr_a = e75_a - e25_a
spect_res_a = spect_iqr_a / spect_com_a * 1000
avg_results = {
f"{camera}:AVG-FIT-COM": np.float64(center_a),
f"{camera}:AVG-FIT-FWHM": np.float64(2.355 * sigma_a),
f"{camera}:AVG-FIT-RMS": np.float64(sigma_a),
f"{camera}:AVG-FIT-RES": np.float64(2.355 * sigma_a / center_a * 1000),
f"{camera}:AVG-SPECT-COM": spect_com_a,
f"{camera}:AVG-SPECT-RMS": spect_std_a,
f"{camera}:AVG-SPECT-SKEW": spect_skew_a,
f"{camera}:AVG-SPECT-IQR": spect_iqr_a,
f"{camera}:AVG-SPECT-RES": np.float64(spect_res_a),
f"{camera}:AVG-SPECTRUM_Y": avg_spectrum,
f"{camera}:AVG-FIT-SPECTRUM_Y": fit_avg_spectrum
}
# Merge and queue for PV update
full = {**result, **ravg_results, **avg_results}
if epics_lock.acquire(False):
try:
if pulse_id > sent_pid:
sent_pid = pulse_id
buffer.append(tuple(full[pv] for pv in all_pv_names))
finally:
epics_lock.release()
return full