Files
camserver_sf/configuration/user_scripts/swissfel_spectral_processing.py
2025-08-12 11:36:07 +02:00

311 lines
12 KiB
Python

from logging import getLogger
from cam_server.pipeline.data_processing import functions
from cam_server.utils import create_thread_pvs, epics_lock
from collections import deque
import json
import numpy as np
import scipy.signal
import numba
import time
import sys
from threading import Thread
# Configure Numba to use multiple threads
numba.set_num_threads(4)
_logger = getLogger(__name__)
# Shared state globals
global_roi = [0, 0]
initialized = False
sent_pid = -1
buffer = deque(maxlen=5)
channel_pv_names = None
base_pv_names = []
all_pv_names = []
global_ravg_length = 100
ravg_buffers = {}
# Configuration for rolling-average of statistics
# Configuration for N-shot average spectrum
global_avg_length = 100
avg_buffer = None
avg_pv_names = []
@numba.njit(parallel=False)
def get_spectrum(image, background):
"""Compute background-subtracted spectrum via row-wise summation."""
y, x = image.shape
profile = np.zeros(x, dtype=np.float64)
for i in numba.prange(y):
for j in range(x):
profile[j] += image[i, j] - background[i, j]
return profile
def update_PVs(buffer, *pv_names):
pvs = create_thread_pvs(list(pv_names))
while True:
time.sleep(0.1)
try:
rec = buffer.popleft()
except IndexError:
continue
for pv, val in zip(pvs, rec):
if pv and pv.connected and (val is not None):
try:
pv.put(val)
except Exception as e:
_logger.error(f"Error writing to {pv.pvname}: {e}")
def initialize(params):
"""Initialize PV names, running-average settings, N-shot average, and launch update thread."""
global channel_pv_names, base_pv_names, all_pv_names, global_ravg_length
global global_avg_length, avg_buffer, avg_pv_names
camera = params["camera_name"]
e_int = params["e_int_name"]
e_axis = params["e_axis_name"]
# Fit/result PV names
center_pv = f"{camera}:FIT-COM"
fwhm_pv = f"{camera}:FIT-FWHM"
fit_rms_pv = f"{camera}:FIT-RMS"
fit_res_pv = f"{camera}:FIT-RES"
fit_spec_pv = f"{camera}:FIT-SPECTRUM_Y"
# ROI PVs for dynamic read
ymin_pv = f"{camera}:SPC_ROI_YMIN"
ymax_pv = f"{camera}:SPC_ROI_YMAX"
axis_pv = e_axis
channel_pv_names = [ymin_pv, ymax_pv, axis_pv]
# Spectrum statistical PV names
com_pv = f"{camera}:SPECT-COM"
std_pv = f"{camera}:SPECT-RMS"
skew_pv = f"{camera}:SPECT-SKEW"
iqr_pv = f"{camera}:SPECT-IQR"
res_pv = f"{camera}:SPECT-RES"
# Base PVs for update thread (order matters)
base_pv_names = [
e_int, center_pv, fwhm_pv, fit_rms_pv,
fit_res_pv, fit_spec_pv, com_pv, std_pv, skew_pv, iqr_pv, res_pv
]
# Running-average configuration
global_ravg_length = params.get('RAVG_length', global_ravg_length)
exclude = {e_int, e_axis, f"{camera}:processing_parameters"}
ravg_base = [pv for pv in base_pv_names if pv not in exclude]
ravg_pv_names = [pv + '-RAVG' for pv in ravg_base]
# N-shot average configuration
global_avg_length = params.get('avg_nshots', global_avg_length)
avg_buffer = deque(maxlen=global_avg_length)
# Define PVs for N-shot average statistics and spectra
avg_pv_names = [
f"{camera}:AVG-FIT-COM",
f"{camera}:AVG-FIT-FWHM",
f"{camera}:AVG-FIT-RMS",
f"{camera}:AVG-FIT-RES",
f"{camera}:AVG-SPECT-COM",
f"{camera}:AVG-SPECT-RMS",
f"{camera}:AVG-SPECT-SKEW",
f"{camera}:AVG-SPECT-IQR",
f"{camera}:AVG-SPECT-RES",
f"{camera}:AVG-SPECTRUM_Y",
f"{camera}:AVG-FIT-SPECTRUM_Y"
]
global ARRAY_PVS
ARRAY_PVS = set([
f"{camera}:SPECTRUM_Y",
f"{camera}:FIT-SPECTRUM_Y",
f"{camera}:AVG-SPECTRUM_Y",
f"{camera}:AVG-FIT-SPECTRUM_Y",
f"{camera}:FIT-SPECTRUM_Y-RAVG"
# add any others here
])
# All PVs (original + running average + N-shot average)
all_pv_names = base_pv_names + ravg_pv_names + avg_pv_names
# Start background thread for PV updates
thread = Thread(target=update_PVs, args=(buffer, *all_pv_names), daemon=True)
thread.start()
def _pv_safe(val, pvname):
if pvname in ARRAY_PVS:
# array PV: must always send a 1D numpy array
if isinstance(val, np.ndarray):
if val.ndim == 1:
return val.astype(np.float64)
elif val.ndim == 0:
# Scalar array, wrap as 1-element
return np.array([val.item()], dtype=np.float64)
else:
raise ValueError(f"{pvname}: expected 1D array, got shape {val.shape}")
elif isinstance(val, (list, tuple)):
return np.array(val, dtype=np.float64)
elif isinstance(val, (float, int, np.generic)):
return np.array([val], dtype=np.float64)
else:
raise TypeError(f"{pvname}: expected array, got {type(val)}")
else:
# scalar PV: must always send float
if isinstance(val, np.ndarray):
if val.size == 1:
return float(val.item())
else:
raise ValueError(f"{pvname}: expected scalar, got array of size {val.size}")
elif isinstance(val, (np.generic, float, int)):
return float(val)
else:
raise TypeError(f"{pvname}: expected scalar, got {type(val)}")
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None):
"""
Main entrypoint: subtract background, crop ROI, smooth, fit Gaussian,
compute metrics, N-shot average, queue PV updates (with running averages).
Returns a dict of processed PV values (original and average channels).
"""
global initialized, sent_pid, channel_pv_names
global global_ravg_length, ravg_buffers, avg_buffer
try:
if not initialized:
initialize(parameters)
initialized = True
camera = parameters["camera_name"]
# Dynamic ROI and axis PV read
ymin_pv, ymax_pv, axis_pv = create_thread_pvs(channel_pv_names)
if ymin_pv and ymin_pv.connected:
global_roi[0] = ymin_pv.value
if ymax_pv and ymax_pv.connected:
global_roi[1] = ymax_pv.value
if not (axis_pv and axis_pv.connected):
_logger.warning("Energy axis not connected")
return None
axis = axis_pv.value[:image.shape[1]]
# Preprocess image
proc_img = image.astype(np.float32) - np.float32(parameters.get("pixel_bkg", 0))
nrows, _ = proc_img.shape
# Background image
bg_img = parameters.pop('background_data', None)
bg_img = bg_img.astype(np.float32) if isinstance(bg_img, np.ndarray) and bg_img.shape == proc_img.shape else None
# Crop ROI
ymin, ymax = map(int, global_roi)
if 0 <= ymin < ymax <= nrows:
proc_img = proc_img[ymin:ymax, :]
if bg_img is not None:
bg_img = bg_img[ymin:ymax, :]
# Extract spectrum and fitted spectrum
spectrum = get_spectrum(proc_img, bg_img) if bg_img is not None else np.sum(proc_img, axis=0)
smoothed = scipy.signal.savgol_filter(spectrum, 51, 3)
minimum, maximum = smoothed.min(), smoothed.max()
amplitude = maximum - minimum
skip = amplitude <= nrows * 1.5
offset, amp_fit, center, sigma = functions.gauss_fit_psss(
smoothed[::2], axis[::2], offset=minimum, amplitude=amplitude, skip=skip, maxfev=10
)
# Reconstruct fitted curve
fit_spectrum = offset + amp_fit * np.exp(-((axis - center)**2) / (2 * sigma**2))
#fit_spectrum = np.abs(fit_spectrum)
# Moments
sm_norm = smoothed / np.sum(smoothed)
spect_com = np.sum(axis * sm_norm)
spect_std = np.sqrt(np.sum((axis - spect_com)**2 * sm_norm))
spect_skew = np.sum((axis - spect_com)**3 * sm_norm) / (spect_std**3)
cum = np.cumsum(sm_norm); e25 = np.interp(0.25, cum, axis); e75 = np.interp(0.75, cum, axis)
spect_iqr = e75 - e25; spect_sum = np.sum(spectrum)
# Original result
result = {
parameters["e_int_name"]: spectrum,
parameters["e_axis_name"]: axis,
f"{camera}:SPECTRUM_Y_SUM": spect_sum,
f"{camera}:FIT-COM": np.float64(center),
f"{camera}:FIT-FWHM": np.float64(2.355 * sigma),
f"{camera}:FIT-RMS": np.float64(sigma),
f"{camera}:FIT-RES": np.float64(2.355 * sigma / center * 1000),
f"{camera}:FIT-SPECTRUM_Y": np.float64(fit_spectrum),
f"{camera}:SPECT-COM": spect_com,
f"{camera}:SPECT-RMS": spect_std,
f"{camera}:SPECT-SKEW": spect_skew,
f"{camera}:SPECT-IQR": spect_iqr,
f"{camera}:SPECT-RES": np.float64(spect_iqr / spect_com * 1000),
f"{camera}:processing_parameters": json.dumps({"roi": global_roi})
}
# Rolling averages
exclude = {parameters["e_int_name"], parameters["e_axis_name"], f"{camera}:processing_parameters"}
ravg_results = {}
for base_pv in (pv for pv in base_pv_names if pv not in exclude):
buf = ravg_buffers.setdefault(base_pv, deque(maxlen=global_ravg_length))
buf.append(result[base_pv])
ravg_results[f"{base_pv}-RAVG"] = np.mean(buf)
# N-shot average and average fitted spectrum
avg_buffer.append(spectrum)
avg_spectrum = np.mean(np.stack(avg_buffer), axis=0)
avg_spectrum = np.abs(avg_spectrum)
fit_avg = offset + amp_fit * np.exp(-((axis - center)**2) / (2 * sigma**2)) # using avg fit params below
sm_avg = scipy.signal.savgol_filter(avg_spectrum, 51, 3)
min_a, max_a = sm_avg.min(), sm_avg.max()
amp_a = max_a - min_a; skip_a = amp_a <= nrows * 1.5
offs_a, amp_fit_a, center_a, sigma_a = functions.gauss_fit_psss(
sm_avg[::2], axis[::2], offset=min_a, amplitude=amp_a, skip=skip_a, maxfev=10
)
fit_avg_spectrum = offs_a + amp_fit_a * np.exp(-((axis - center_a)**2) / (2 * sigma_a**2))
fit_avg_spectrum = np.abs(fit_avg_spectrum)
# Average moments
sm_norm_a = sm_avg / np.sum(sm_avg)
spect_com_a = np.sum(axis * sm_norm_a)
spect_std_a = np.sqrt(np.sum((axis - spect_com_a)**2 * sm_norm_a))
spect_skew_a = np.sum((axis - spect_com_a)**3 * sm_norm_a) / (spect_std_a**3)
cum_a = np.cumsum(sm_norm_a); e25_a = np.interp(0.25, cum_a, axis); e75_a = np.interp(0.75, cum_a, axis)
spect_iqr_a = e75_a - e25_a
spect_res_a = spect_iqr_a / spect_com_a * 1000
avg_results = {
f"{camera}:AVG-FIT-COM": np.float64(center),
f"{camera}:AVG-FIT-FWHM": np.float64(2.355 * sigma_a),
f"{camera}:AVG-FIT-RMS": np.float64(sigma_a),
f"{camera}:AVG-FIT-RES": np.float64(2.355 * sigma_a / center_a * 1000),
f"{camera}:AVG-SPECT-COM": spect_com,
f"{camera}:AVG-SPECT-RMS": spect_std_a,
f"{camera}:AVG-SPECT-SKEW": spect_skew_a,
f"{camera}:AVG-SPECT-IQR": spect_iqr_a,
f"{camera}:AVG-SPECT-RES": np.float64(spect_res_a),
f"{camera}:AVG-SPECTRUM_Y": avg_spectrum,
f"{camera}:AVG-FIT-SPECTRUM_Y": fit_avg_spectrum
}
# Merge and queue
full_results = {**result, **ravg_results, **avg_results}
if epics_lock.acquire(False):
try:
if pulse_id > sent_pid:
sent_pid = pulse_id
#entry = tuple(full_results.get(pv) for pv in all_pv_names)
entry = tuple(_pv_safe(full_results.get(pv), pv) for pv in all_pv_names)
buffer.append(entry)
finally:
epics_lock.release()
return full_results
except Exception as ex:
_logger.warning("Processing error: %s", ex)
return {}