311 lines
12 KiB
Python
311 lines
12 KiB
Python
from logging import getLogger
|
|
from cam_server.pipeline.data_processing import functions
|
|
from cam_server.utils import create_thread_pvs, epics_lock
|
|
from collections import deque
|
|
import json
|
|
import numpy as np
|
|
import scipy.signal
|
|
import numba
|
|
import time
|
|
import sys
|
|
from threading import Thread
|
|
|
|
# Configure Numba to use multiple threads
|
|
numba.set_num_threads(4)
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
# Shared state globals
|
|
global_roi = [0, 0]
|
|
initialized = False
|
|
sent_pid = -1
|
|
buffer = deque(maxlen=5)
|
|
channel_pv_names = None
|
|
base_pv_names = []
|
|
all_pv_names = []
|
|
global_ravg_length = 100
|
|
ravg_buffers = {}
|
|
# Configuration for rolling-average of statistics
|
|
|
|
# Configuration for N-shot average spectrum
|
|
global_avg_length = 100
|
|
avg_buffer = None
|
|
avg_pv_names = []
|
|
|
|
@numba.njit(parallel=False)
|
|
def get_spectrum(image, background):
|
|
"""Compute background-subtracted spectrum via row-wise summation."""
|
|
y, x = image.shape
|
|
profile = np.zeros(x, dtype=np.float64)
|
|
for i in numba.prange(y):
|
|
for j in range(x):
|
|
profile[j] += image[i, j] - background[i, j]
|
|
return profile
|
|
|
|
def update_PVs(buffer, *pv_names):
|
|
pvs = create_thread_pvs(list(pv_names))
|
|
while True:
|
|
time.sleep(0.1)
|
|
try:
|
|
rec = buffer.popleft()
|
|
except IndexError:
|
|
continue
|
|
for pv, val in zip(pvs, rec):
|
|
if pv and pv.connected and (val is not None):
|
|
try:
|
|
pv.put(val)
|
|
except Exception as e:
|
|
_logger.error(f"Error writing to {pv.pvname}: {e}")
|
|
|
|
|
|
|
|
def initialize(params):
|
|
"""Initialize PV names, running-average settings, N-shot average, and launch update thread."""
|
|
global channel_pv_names, base_pv_names, all_pv_names, global_ravg_length
|
|
global global_avg_length, avg_buffer, avg_pv_names
|
|
|
|
camera = params["camera_name"]
|
|
e_int = params["e_int_name"]
|
|
e_axis = params["e_axis_name"]
|
|
|
|
# Fit/result PV names
|
|
center_pv = f"{camera}:FIT-COM"
|
|
fwhm_pv = f"{camera}:FIT-FWHM"
|
|
fit_rms_pv = f"{camera}:FIT-RMS"
|
|
fit_res_pv = f"{camera}:FIT-RES"
|
|
fit_spec_pv = f"{camera}:FIT-SPECTRUM_Y"
|
|
|
|
# ROI PVs for dynamic read
|
|
ymin_pv = f"{camera}:SPC_ROI_YMIN"
|
|
ymax_pv = f"{camera}:SPC_ROI_YMAX"
|
|
axis_pv = e_axis
|
|
channel_pv_names = [ymin_pv, ymax_pv, axis_pv]
|
|
|
|
# Spectrum statistical PV names
|
|
com_pv = f"{camera}:SPECT-COM"
|
|
std_pv = f"{camera}:SPECT-RMS"
|
|
skew_pv = f"{camera}:SPECT-SKEW"
|
|
iqr_pv = f"{camera}:SPECT-IQR"
|
|
res_pv = f"{camera}:SPECT-RES"
|
|
|
|
# Base PVs for update thread (order matters)
|
|
base_pv_names = [
|
|
e_int, center_pv, fwhm_pv, fit_rms_pv,
|
|
fit_res_pv, fit_spec_pv, com_pv, std_pv, skew_pv, iqr_pv, res_pv
|
|
]
|
|
|
|
# Running-average configuration
|
|
global_ravg_length = params.get('RAVG_length', global_ravg_length)
|
|
exclude = {e_int, e_axis, f"{camera}:processing_parameters"}
|
|
ravg_base = [pv for pv in base_pv_names if pv not in exclude]
|
|
ravg_pv_names = [pv + '-RAVG' for pv in ravg_base]
|
|
|
|
# N-shot average configuration
|
|
global_avg_length = params.get('avg_nshots', global_avg_length)
|
|
avg_buffer = deque(maxlen=global_avg_length)
|
|
# Define PVs for N-shot average statistics and spectra
|
|
avg_pv_names = [
|
|
f"{camera}:AVG-FIT-COM",
|
|
f"{camera}:AVG-FIT-FWHM",
|
|
f"{camera}:AVG-FIT-RMS",
|
|
f"{camera}:AVG-FIT-RES",
|
|
f"{camera}:AVG-SPECT-COM",
|
|
f"{camera}:AVG-SPECT-RMS",
|
|
f"{camera}:AVG-SPECT-SKEW",
|
|
f"{camera}:AVG-SPECT-IQR",
|
|
f"{camera}:AVG-SPECT-RES",
|
|
f"{camera}:AVG-SPECTRUM_Y",
|
|
f"{camera}:AVG-FIT-SPECTRUM_Y"
|
|
]
|
|
global ARRAY_PVS
|
|
ARRAY_PVS = set([
|
|
f"{camera}:SPECTRUM_Y",
|
|
f"{camera}:FIT-SPECTRUM_Y",
|
|
f"{camera}:AVG-SPECTRUM_Y",
|
|
f"{camera}:AVG-FIT-SPECTRUM_Y",
|
|
f"{camera}:FIT-SPECTRUM_Y-RAVG"
|
|
# add any others here
|
|
])
|
|
|
|
|
|
# All PVs (original + running average + N-shot average)
|
|
all_pv_names = base_pv_names + ravg_pv_names + avg_pv_names
|
|
|
|
# Start background thread for PV updates
|
|
thread = Thread(target=update_PVs, args=(buffer, *all_pv_names), daemon=True)
|
|
thread.start()
|
|
|
|
def _pv_safe(val, pvname):
|
|
if pvname in ARRAY_PVS:
|
|
# array PV: must always send a 1D numpy array
|
|
if isinstance(val, np.ndarray):
|
|
if val.ndim == 1:
|
|
return val.astype(np.float64)
|
|
elif val.ndim == 0:
|
|
# Scalar array, wrap as 1-element
|
|
return np.array([val.item()], dtype=np.float64)
|
|
else:
|
|
raise ValueError(f"{pvname}: expected 1D array, got shape {val.shape}")
|
|
elif isinstance(val, (list, tuple)):
|
|
return np.array(val, dtype=np.float64)
|
|
elif isinstance(val, (float, int, np.generic)):
|
|
return np.array([val], dtype=np.float64)
|
|
else:
|
|
raise TypeError(f"{pvname}: expected array, got {type(val)}")
|
|
else:
|
|
# scalar PV: must always send float
|
|
if isinstance(val, np.ndarray):
|
|
if val.size == 1:
|
|
return float(val.item())
|
|
else:
|
|
raise ValueError(f"{pvname}: expected scalar, got array of size {val.size}")
|
|
elif isinstance(val, (np.generic, float, int)):
|
|
return float(val)
|
|
else:
|
|
raise TypeError(f"{pvname}: expected scalar, got {type(val)}")
|
|
|
|
|
|
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None):
|
|
"""
|
|
Main entrypoint: subtract background, crop ROI, smooth, fit Gaussian,
|
|
compute metrics, N-shot average, queue PV updates (with running averages).
|
|
Returns a dict of processed PV values (original and average channels).
|
|
"""
|
|
global initialized, sent_pid, channel_pv_names
|
|
global global_ravg_length, ravg_buffers, avg_buffer
|
|
try:
|
|
if not initialized:
|
|
initialize(parameters)
|
|
initialized = True
|
|
|
|
camera = parameters["camera_name"]
|
|
|
|
# Dynamic ROI and axis PV read
|
|
ymin_pv, ymax_pv, axis_pv = create_thread_pvs(channel_pv_names)
|
|
if ymin_pv and ymin_pv.connected:
|
|
global_roi[0] = ymin_pv.value
|
|
if ymax_pv and ymax_pv.connected:
|
|
global_roi[1] = ymax_pv.value
|
|
|
|
if not (axis_pv and axis_pv.connected):
|
|
_logger.warning("Energy axis not connected")
|
|
return None
|
|
axis = axis_pv.value[:image.shape[1]]
|
|
|
|
# Preprocess image
|
|
proc_img = image.astype(np.float32) - np.float32(parameters.get("pixel_bkg", 0))
|
|
nrows, _ = proc_img.shape
|
|
|
|
# Background image
|
|
bg_img = parameters.pop('background_data', None)
|
|
bg_img = bg_img.astype(np.float32) if isinstance(bg_img, np.ndarray) and bg_img.shape == proc_img.shape else None
|
|
|
|
# Crop ROI
|
|
ymin, ymax = map(int, global_roi)
|
|
if 0 <= ymin < ymax <= nrows:
|
|
proc_img = proc_img[ymin:ymax, :]
|
|
if bg_img is not None:
|
|
bg_img = bg_img[ymin:ymax, :]
|
|
|
|
# Extract spectrum and fitted spectrum
|
|
spectrum = get_spectrum(proc_img, bg_img) if bg_img is not None else np.sum(proc_img, axis=0)
|
|
smoothed = scipy.signal.savgol_filter(spectrum, 51, 3)
|
|
minimum, maximum = smoothed.min(), smoothed.max()
|
|
amplitude = maximum - minimum
|
|
skip = amplitude <= nrows * 1.5
|
|
offset, amp_fit, center, sigma = functions.gauss_fit_psss(
|
|
smoothed[::2], axis[::2], offset=minimum, amplitude=amplitude, skip=skip, maxfev=10
|
|
)
|
|
# Reconstruct fitted curve
|
|
fit_spectrum = offset + amp_fit * np.exp(-((axis - center)**2) / (2 * sigma**2))
|
|
#fit_spectrum = np.abs(fit_spectrum)
|
|
|
|
# Moments
|
|
sm_norm = smoothed / np.sum(smoothed)
|
|
spect_com = np.sum(axis * sm_norm)
|
|
spect_std = np.sqrt(np.sum((axis - spect_com)**2 * sm_norm))
|
|
spect_skew = np.sum((axis - spect_com)**3 * sm_norm) / (spect_std**3)
|
|
cum = np.cumsum(sm_norm); e25 = np.interp(0.25, cum, axis); e75 = np.interp(0.75, cum, axis)
|
|
spect_iqr = e75 - e25; spect_sum = np.sum(spectrum)
|
|
|
|
# Original result
|
|
result = {
|
|
parameters["e_int_name"]: spectrum,
|
|
parameters["e_axis_name"]: axis,
|
|
f"{camera}:SPECTRUM_Y_SUM": spect_sum,
|
|
f"{camera}:FIT-COM": np.float64(center),
|
|
f"{camera}:FIT-FWHM": np.float64(2.355 * sigma),
|
|
f"{camera}:FIT-RMS": np.float64(sigma),
|
|
f"{camera}:FIT-RES": np.float64(2.355 * sigma / center * 1000),
|
|
f"{camera}:FIT-SPECTRUM_Y": np.float64(fit_spectrum),
|
|
f"{camera}:SPECT-COM": spect_com,
|
|
f"{camera}:SPECT-RMS": spect_std,
|
|
f"{camera}:SPECT-SKEW": spect_skew,
|
|
f"{camera}:SPECT-IQR": spect_iqr,
|
|
f"{camera}:SPECT-RES": np.float64(spect_iqr / spect_com * 1000),
|
|
f"{camera}:processing_parameters": json.dumps({"roi": global_roi})
|
|
}
|
|
|
|
# Rolling averages
|
|
exclude = {parameters["e_int_name"], parameters["e_axis_name"], f"{camera}:processing_parameters"}
|
|
ravg_results = {}
|
|
for base_pv in (pv for pv in base_pv_names if pv not in exclude):
|
|
buf = ravg_buffers.setdefault(base_pv, deque(maxlen=global_ravg_length))
|
|
buf.append(result[base_pv])
|
|
ravg_results[f"{base_pv}-RAVG"] = np.mean(buf)
|
|
|
|
# N-shot average and average fitted spectrum
|
|
avg_buffer.append(spectrum)
|
|
avg_spectrum = np.mean(np.stack(avg_buffer), axis=0)
|
|
avg_spectrum = np.abs(avg_spectrum)
|
|
fit_avg = offset + amp_fit * np.exp(-((axis - center)**2) / (2 * sigma**2)) # using avg fit params below
|
|
sm_avg = scipy.signal.savgol_filter(avg_spectrum, 51, 3)
|
|
min_a, max_a = sm_avg.min(), sm_avg.max()
|
|
amp_a = max_a - min_a; skip_a = amp_a <= nrows * 1.5
|
|
offs_a, amp_fit_a, center_a, sigma_a = functions.gauss_fit_psss(
|
|
sm_avg[::2], axis[::2], offset=min_a, amplitude=amp_a, skip=skip_a, maxfev=10
|
|
)
|
|
fit_avg_spectrum = offs_a + amp_fit_a * np.exp(-((axis - center_a)**2) / (2 * sigma_a**2))
|
|
fit_avg_spectrum = np.abs(fit_avg_spectrum)
|
|
|
|
# Average moments
|
|
sm_norm_a = sm_avg / np.sum(sm_avg)
|
|
spect_com_a = np.sum(axis * sm_norm_a)
|
|
spect_std_a = np.sqrt(np.sum((axis - spect_com_a)**2 * sm_norm_a))
|
|
spect_skew_a = np.sum((axis - spect_com_a)**3 * sm_norm_a) / (spect_std_a**3)
|
|
cum_a = np.cumsum(sm_norm_a); e25_a = np.interp(0.25, cum_a, axis); e75_a = np.interp(0.75, cum_a, axis)
|
|
spect_iqr_a = e75_a - e25_a
|
|
spect_res_a = spect_iqr_a / spect_com_a * 1000
|
|
|
|
avg_results = {
|
|
f"{camera}:AVG-FIT-COM": np.float64(center),
|
|
f"{camera}:AVG-FIT-FWHM": np.float64(2.355 * sigma_a),
|
|
f"{camera}:AVG-FIT-RMS": np.float64(sigma_a),
|
|
f"{camera}:AVG-FIT-RES": np.float64(2.355 * sigma_a / center_a * 1000),
|
|
f"{camera}:AVG-SPECT-COM": spect_com,
|
|
f"{camera}:AVG-SPECT-RMS": spect_std_a,
|
|
f"{camera}:AVG-SPECT-SKEW": spect_skew_a,
|
|
f"{camera}:AVG-SPECT-IQR": spect_iqr_a,
|
|
f"{camera}:AVG-SPECT-RES": np.float64(spect_res_a),
|
|
f"{camera}:AVG-SPECTRUM_Y": avg_spectrum,
|
|
f"{camera}:AVG-FIT-SPECTRUM_Y": fit_avg_spectrum
|
|
}
|
|
|
|
# Merge and queue
|
|
full_results = {**result, **ravg_results, **avg_results}
|
|
if epics_lock.acquire(False):
|
|
try:
|
|
if pulse_id > sent_pid:
|
|
sent_pid = pulse_id
|
|
#entry = tuple(full_results.get(pv) for pv in all_pv_names)
|
|
entry = tuple(_pv_safe(full_results.get(pv), pv) for pv in all_pv_names)
|
|
buffer.append(entry)
|
|
finally:
|
|
epics_lock.release()
|
|
|
|
return full_results
|
|
|
|
except Exception as ex:
|
|
_logger.warning("Processing error: %s", ex)
|
|
return {}
|